/**********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
**********************************************************************/
/* -*-C++-*-
**************************************************************************
*
* File:         PartFunc.cpp
* Description:  Partitioning Function
* Created:      11/16/1994
* Language:     C++
*
*
*
*
**************************************************************************
*/

// -----------------------------------------------------------------------

#include "PartReq.h"
#include "PhyProp.h"
#include "ItemColRef.h"
#include "ItemLog.h"
#include "ItemFunc.h"
#include "ItemOther.h"
#include "ItemArith.h"
#include "opt.h"
#include "str.h"
#include "NumericType.h"
#include "MiscType.h"
#include "NAFileSet.h"
#include "SearchKey.h"
#include "GroupAttr.h"
#include "Generator.h"

     
// To test hash2 part number generated by a direct call to 
// ExHDHash::hash() and compare the result, just uncomment out
// the following two lines.
//#include "Generator.h"

#include "exp_function.h"

// ***********************************************************************
// A function having an external linkage to allow display() to
// be called on a tree object. This is a workaround for bugs/missing
// functionality in ObjectCenter that cause display() to become
// an undefined symbol.
// ***********************************************************************

void displayPartitioningFunction(const PartitioningFunction& pf)
{
  pf.display();
}

void displayPartitioningFunction(const PartitioningFunction* pf)
{
  if (pf)
    pf->display();
}

void displayPartitionBoundaries(const RangePartitionBoundaries& pb)
{
  pb.display();
}

void displayPartitionBoundaries(const RangePartitionBoundaries* pb)
{
  if (pb)
    pb->display();
}


// ***********************************************************************
// PartitioningFunction
// ***********************************************************************

//------------------------------------------------------------------------
// PartitioningFunction destructor.
//------------------------------------------------------------------------
PartitioningFunction::~PartitioningFunction()
{
  if (nodeMap_ != NULL)
    {
      delete nodeMap_;
    }

} //PartitioningFunction destructor

// -----------------------------------------------------------------------
// Methods for perform type-safe pointer casts.
// -----------------------------------------------------------------------
const LogPhysPartitioningFunction*
PartitioningFunction::castToLogPhysPartitioningFunction() const
                                                          { return NULL; }

const SinglePartitionPartitioningFunction*
PartitioningFunction::castToSinglePartitionPartitioningFunction() const
                                                          { return NULL; }

const ReplicateViaBroadcastPartitioningFunction*
PartitioningFunction::castToReplicateViaBroadcastPartitioningFunction() const
                                                          { return NULL; }

const ReplicateNoBroadcastPartitioningFunction*
PartitioningFunction::castToReplicateNoBroadcastPartitioningFunction() const
                                                          { return NULL; }

const HashPartitioningFunction*
PartitioningFunction::castToHashPartitioningFunction() const
                                                          { return NULL; }

const TableHashPartitioningFunction*
PartitioningFunction::castToTableHashPartitioningFunction() const
                                                          { return NULL; }

const HashDistPartitioningFunction*
PartitioningFunction::castToHashDistPartitioningFunction() const
                                                          { return NULL; }

const Hash2PartitioningFunction*
PartitioningFunction::castToHash2PartitioningFunction() const
                                                          { return NULL; }

const RangePartitioningFunction*
PartitioningFunction::castToRangePartitioningFunction() const
                                                          { return NULL; }

const RoundRobinPartitioningFunction*
PartitioningFunction::castToRoundRobinPartitioningFunction() const
                                                          { return NULL; }

const SkewedDataPartitioningFunction*
PartitioningFunction::castToSkewedDataPartitioningFunction() const
                                                          { return NULL; };

const HivePartitioningFunction* 
PartitioningFunction::castToHivePartitioningFunction() const 
                                                          { return NULL; };



// ---------------------------------------------------------------------
// Method to test if the partitioning key contains any approximate
// numeric type columns. Necessary because in some cases certain
// parallel operations do not function properly if the partitioning
// key of the table contains approximate numeric columns.
// ---------------------------------------------------------------------
NABoolean PartitioningFunction::partKeyContainsFloatColumn() const
{
  if (NOT getPartitioningKey().isEmpty())
  {
    ValueIdSet partKeyColumns = getPartitioningKey();
    for (ValueId vid = partKeyColumns.init();
                       partKeyColumns.next(vid);
                       partKeyColumns.advance(vid))
    {
      const NAType& columnType = vid.getType();
      if ((columnType.getTypeQualifier() == NA_NUMERIC_TYPE) AND
          NOT ((NumericType&)columnType).isExact())
      {
        return TRUE;
      }
    } // end for
  } // end if part key is not empty
  return FALSE;
} // partKeyContainsFloatColumn()

// -----------------------------------------------------------------------
// PartitioningFunction::getNodeMap()
// Return base class node map by default.
// -----------------------------------------------------------------------
const NodeMap*
PartitioningFunction::getNodeMap() const        { return nodeMap_;       }

// use any existing nodemap from my req or my child (or synthesize one) that 
// matches my partition count requirement
void 
PartitioningFunction::useNodeMapFromReqOrChild(PartitioningRequirement *req, 
                                               PartitioningFunction *childPF,
                                               NABoolean forESP)
{
  ULng32 partCnt = (ULng32)getCountOfPartitions();
  NodeMap *myNodeMap = NULL;
  if (req->isRequirementFullySpecified() &&
      (CmpCommon::getDefault(COMP_BOOL_87) != DF_ON) && getNodeMap() &&
      (getNodeMap()->getNumEntries() == partCnt)) {
    // we are a copy of a partitioning function that realizes a fully specified
    // partitioning requirement. So, do nothing. our nodemap is good enough.
    return;
  } else if ((CmpCommon::getDefault(COMP_BOOL_87) != DF_ON) &&
             (partCnt == (ULng32)childPF->getCountOfPartitions()) &&
             childPF->getNodeMap() &&
             (childPF->getNodeMap()->getNumEntries() == partCnt)) {
    // we are a copy of a partitioning function that realizes a fuzzy
    // partitioning requirement. childPF is the partitioning function of 
    // the synthesized physical property of the child of an Exchange. So, 
    // our child's partitioning function's nodemap is good enough. 
    myNodeMap = childPF->getNodeMap()->copy();
  } else {
    // only as a last resort do we synthesize a new node map

    // Synthesize a nodemap based on the nodemap of the child and the
    // desired number of ESPs.  Using synthesizeLogicalMap() assumes
    // that the lower and upper ESPs are associated via grouping.  This
    // assumption is not valid when considering the communication
    // patterns between the upper and lower ESPs, but this assumption
    // will lead to a reasonable nodemap for the upper ESPs.
    myNodeMap = ((NodeMap*)childPF->getNodeMap())
      ->synthesizeLogicalMap(partCnt, forESP);
  }
  CMPASSERT(myNodeMap);
  for(CollIndex i = 0; i < partCnt; i++) {
    myNodeMap->setPartitionState(i, NodeMapEntry::ACTIVE);
  }
  CMPASSERT(myNodeMap->getNumActivePartitions() == partCnt);
  replaceNodeMap(myNodeMap);
}

// -----------------------------------------------------------------------
// PartitioningFunction::replaceNodeMap()
// Replace node map with a newly specified node map.
// -----------------------------------------------------------------------
void
PartitioningFunction::replaceNodeMap(NodeMap* nodeMap)
{
  if (nodeMap_ != NULL)
    {
#ifndef NDEBUG
      //assertion for NATable caching.
      //assert if this object is not on the system heap i.e. (collHeap()!=0)
      //and if the heap is not the statementHeap.
      //The assertion would happen if this object is on a NATable heap,
      //since we should not be changing the partitionins function associated
      //with a cached NATable object
      if((collHeap()) && (collHeap() != CmpCommon::statementHeap()))
        CMPASSERT(FALSE);
#endif //NDEBUG
      //Fix for solution 10-040120-2524
      //Unconditionally deleting the nodeMap_
      //was causing problems as the object pointed
      //to by nodeMap_ was being used by other guys also.
      //
      //delete the object if is on the system heap.
      //If it is on the the statement heap then it will
      //be delete when the statement heap is deleted.
      if(!nodeMap_->collHeap())delete nodeMap_;
    }

  nodeMap_ = nodeMap;

} // PartitioningFunction::replaceNodeMap()

// -----------------------------------------------------------------------
// PartitioningFunction::copy()
// Virtual copy constructor returns a copy of myself.
// -----------------------------------------------------------------------
PartitioningFunction* PartitioningFunction::copy() const
{
  // illegal to call copy() of the base class
  CMPABORT;
  return NULL;
}

// -----------------------------------------------------------------------
// PartitioningFunction::normalizePartitioningKeys()
// Rewrite the partitioning keys of the partitioning function in
// terms of the VEGReference for the VEG to which the partitioning
// key column belongs.
// -----------------------------------------------------------------------
void PartitioningFunction::normalizePartitioningKeys(NormWA& normWARef)
{
  partitioningKeyColumns_.normalizeNode(normWARef);
  partitioningKeyPredicates_.normalizeNode(normWARef);

  if(partitioningExpression_)
    partitioningExpression_ =
      partitioningExpression_->normalizeNode(normWARef);

  if(partitionSelectionExpr_)
    partitionSelectionExpr_ =
      partitionSelectionExpr_->normalizeNode(normWARef);

} // PartitioningFunction::normalizePartitioningKeys

// --------------------------------------------------------------------
// A method that is used by optimizer for comparing partitioning 
// function with the random number partitioning function i.e. it only 
// compares the partitioning func type and the number of partitions. 
// It does not compare partitioning key.
// --------------------------------------------------------------------
NABoolean
PartitioningFunction::isKnownReplicaPartFunc() const
{
  return
    (
         (isASkewedDataPartitioningFunction() &&
      ((SkewedDataPartitioningFunction*)this)->getSkewProperty().isBroadcasted())
     ||
     isAReplicateViaBroadcastPartitioningFunction()
     ||
     isAReplicateNoBroadcastPartitioningFunction());
}

COMPARE_RESULT 
PartitioningFunction::comparePartFuncsForUnion
                          (const PartitioningFunction &other) const
{
  NABoolean myRepPartFunc = isKnownReplicaPartFunc();
  NABoolean otherRepPartFunc = other.isKnownReplicaPartFunc();
     
  if ( myRepPartFunc || otherRepPartFunc ) return INCOMPATIBLE;

  if (getCountOfPartitions() == other.getCountOfPartitions())
    return SAME;

  return INCOMPATIBLE;
} // PartitioningFunction::comparePartFuncsForUnion

// -----------------------------------------------------------------------
// PartitioningFunction::comparePartFuncToFunc()
// Partitioning function comparison method for hash, round robin, replication,
// and single_partition partitioning functions i.e. everything except regular
// range partitioning functions.
// "Other" must be a partitioning function.
// -----------------------------------------------------------------------
COMPARE_RESULT
PartitioningFunction::comparePartFuncToFunc
                          (const PartitioningFunction &other) const
{
  if ((getPartitioningFunctionType() == other.getPartitioningFunctionType()) AND
      (getCountOfPartitions() == other.getCountOfPartitions()) AND
      (getPartitioningKey() == other.getPartitioningKey())
     )
    return SAME;
  else
    return INCOMPATIBLE;
} // PartitioningFunction::comparePartFuncToFunc

static inline
PartitioningFunction* getPhysPartFunc(const PartitioningFunction* x)
{
  PartitioningFunction* phys = 0;
  if ( x -> isALogPhysPartitioningFunction() ) {
    phys = x -> castToLogPhysPartitioningFunction() ->
      getPhysPartitioningFunction();
  }
  return phys;
}

NABoolean PartitioningFunction::isAGroupingOf(
     const PartitioningFunction &other,
     Lng32* maxPartsPerGroup) const
{
  if (maxPartsPerGroup != NULL)
    *maxPartsPerGroup = 1;

  // By default we are not smart and take no risk. However, two
  // partitioning functions that are equal are always a grouping of
  // each other.
  return (comparePartFuncToFunc(other) == SAME);
}

PartitioningRequirement* PartitioningFunction::makePartitioningRequirement()
{
  // Redefine PartitioningFunction::makePartitioningRequirement()
  CMPABORT;
  return NULL;
}

// -----------------------------------------------------------------------
// PartitioningFunction::scaleNumberOfPartitions
// -----------------------------------------------------------------------
PartitioningFunction * PartitioningFunction::scaleNumberOfPartitions(
     Lng32 &suggestedNewNumberOfPartitions,
     PartitionGroupingDistEnum partGroupDist)
{
  // if we come here this means that the derived class did not
  // override scaleNumberOfPartitions(). In other words, the derived
  // class doesn't know how to change its number of partitions.
  // Therefore, ignore the suggestion and return the current number
  // of partitions, unless we scale down to 1.
  if (suggestedNewNumberOfPartitions == 1)
    return new(CmpCommon::statementHeap())
      SinglePartitionPartitioningFunction();

  // If an expression has been generated, then we want to discard it
  // because it may no longer be correct.
  storeExpression(NULL);

  if ( suggestedNewNumberOfPartitions != getCountOfPartitions() ) {
     NodeMap* newNodeMap = 
       new (heap_) NodeMap(heap_, suggestedNewNumberOfPartitions);
     replaceNodeMap(newNodeMap);
  }

  suggestedNewNumberOfPartitions = getCountOfPartitions();
  return this;
}

// -----------------------------------------------------------------------
// PartitioningFunction::copyAndRemap()
// -----------------------------------------------------------------------
PartitioningFunction*
PartitioningFunction::copyAndRemap
                         (ValueIdMap& map, NABoolean mapItUp) const
{
  if (getPartitioningKey().entries() == 0)
    return (PartitioningFunction*)this;
  PartitioningFunction* newPartFunc = copy(); // invoke virtual copy constructor
  newPartFunc->remapIt(this, map, mapItUp);

  return newPartFunc;
} // PartitioningFunction::copyAndRemap()

// -----------------------------------------------------------------------
// PartitioningFunction::remapIt()
// -----------------------------------------------------------------------
void PartitioningFunction::remapIt
                             (const PartitioningFunction* opf,
			      ValueIdMap& map, NABoolean mapItUp)
{
  // Clear because rewrite insists on it being so.
  partitioningKeyColumns_.clear();
  partitioningKeyPredicates_.clear();
  partitionInputValues_.clear();
  partitionInputValuesLayout_.clear();

  if (mapItUp)
    {
      map.rewriteValueIdSetUp(partitioningKeyColumns_,
			      opf->partitioningKeyColumns_);
      map.rewriteValueIdSetUp(partitioningKeyPredicates_,
			      opf->partitioningKeyPredicates_);
      // Note the pivs don't need mapping, they just need to be copied.
      map.rewriteValueIdSetUp(partitionInputValues_,
			      opf->partitionInputValues_);
      map.rewriteValueIdListUp(partitionInputValuesLayout_,
			       opf->partitionInputValuesLayout_);
    }
  else
    {
      map.rewriteValueIdSetDown(opf->partitioningKeyColumns_,
				partitioningKeyColumns_);
      map.rewriteValueIdSetDown(opf->partitioningKeyPredicates_,
				partitioningKeyPredicates_);
      // Note the pivs don't need mapping, they just need to be copied.
      map.rewriteValueIdSetDown(opf->partitionInputValues_,
				partitionInputValues_);
      map.rewriteValueIdListDown(opf->partitionInputValuesLayout_,
				 partitionInputValuesLayout_);
    }

  if (partitioningExpression_)
    partitioningExpression_ = partitioningExpression_->
    mapAndRewrite(map,NOT mapItUp).getItemExpr();

} // PartitioningFunction::remapIt()

// -----------------------------------------------------------------------
// PartitioningFunction::shouldUseSynchronousAccess()
// -----------------------------------------------------------------------
NABoolean PartitioningFunction::shouldUseSynchronousAccess(
    const ReqdPhysicalProperty* rpp,
    const EstLogPropSharedPtr& inputLogProp,
    GroupAttributes* ga) const
{
  // This default implementation handles all partitioning functions
  // except range and single partition partitioning functions.

  // Synchronous access only makes sense for range partitioned tables.
  // So, unless the user is trying to force synchronous access
  // (by disabling asynchronous access), then return FALSE.

  NABoolean shouldUseSynchronousAccess = FALSE;

  //   NOTE: we cannot force synchronous access for non-range partitioned
  // tables if there is a required logical order and/or arrangement!
  // Doing so could lead to incorrect results! So, if the user is trying to
  // force synchronous access, then they are out of luck!

  if (NOT rpp->getLogicalOrderOrArrangementFlag())
  {
    const LogicalPartitioningRequirement *lpr =
      rpp->getLogicalPartRequirement();

    // Don't do synchronous access if the user is forcing a PAPA node
    // and is not forcing the number of PAs
    if ((lpr != NULL) AND lpr->getMustUsePapa() AND
        (lpr->getNumClientsReq() == ANY_NUMBER_OF_PARTITIONS))
      return FALSE;

    // Get the value from the defaults table that specifies whether
    // asynchronous access is ok. If it is not, then we need to force
    // synchronous access. Also force synchronous access if the user
    // forced it via C.Q. shape.

    // See if the user is trying to force synchronous access.
    if ((CmpCommon::getDefault(ATTEMPT_ASYNCHRONOUS_ACCESS) == DF_OFF) OR
        ((lpr != NULL) AND
         (lpr->getNumClientsReq() != ANY_NUMBER_OF_PARTITIONS) AND
         (lpr->getNumClientsReq() < getCountOfPartitions())))
    {
      shouldUseSynchronousAccess = TRUE;
    }

  }

  return shouldUseSynchronousAccess;
}


// -----------------------------------------------------------------------
// Virtual functions that must be redefined for derived classes.
// -----------------------------------------------------------------------
Lng32 PartitioningFunction::getCountOfPartitions() const
{
  // Redefine PartitioningFunction::getCountOfPartitions()
  CMPABORT;
  return 1;
}

NABoolean PartitioningFunction::canProducePartitioningKeyPredicates() const
{
  return TRUE; // the most common case is TRUE
}

const ValueIdSet& PartitioningFunction::getPartitioningKeyPredicates() const
{
  CMPASSERT(partKeyPredsCreated_);
  return partitioningKeyPredicates_;
}

const ValueIdSet& PartitioningFunction::getPartitionInputValues() const
{
  CMPASSERT(partKeyPredsCreated_);
  return partitionInputValues_;
}

const ValueIdList& PartitioningFunction::getPartitionInputValuesLayout() const
{
  CMPASSERT(partKeyPredsCreated_);
  return partitionInputValuesLayout_;
}

void PartitioningFunction::createPartitioningKeyPredicates()
{
  // Redefine PartitioningFunction::createPartitioningKeyPredicates()
  CMPABORT;
}

void PartitioningFunction::replacePivs(
       const ValueIdList& newPivs,
       const ValueIdSet& newPartKeyPreds)
{
  // Redefine PartitioningFunction::replacePivs()
  CMPABORT;
}

PartitioningFunction*
PartitioningFunction::createPartitioningFunctionForIndexDesc
                         (IndexDesc *idesc) const
{
  // Redefine PartitioningFunction::createPartitioningFunctionForIndexDesc()
  CMPABORT;
  return NULL;
}

ItemExpr* PartitioningFunction::createPartitioningExpression()
{
  // Redefine PartitioningFunction::createPartitioningExpression()
  CMPABORT;
  return NULL;
}

void PartitioningFunction::createPartSelectionExprFromSearchKey(
      const ValueId beginPartSelId,
      const ValueId endPartSelId,
      ValueIdList &partSelectionValIds) const
{
  partSelectionValIds.insert(beginPartSelId);
  partSelectionValIds.insert(endPartSelId);
}


// -----------------------------------------------------------------------
// PartitioningFunction::preCodeGen()
// Rewrite the partitioning keys of the partitioning function that
// are expressed using VEGReferences in terms of the available values.
// -----------------------------------------------------------------------
void PartitioningFunction::preCodeGen(const ValueIdSet& availableValues)
{
  ValueIdSet noExternalInputs;
  partitioningKeyColumns_.replaceVEGExpressions
                             (availableValues, noExternalInputs, 
                              FALSE, NULL, TRUE);
  if (partitioningExpression_)
    {
      partitioningExpression_ = partitioningExpression_->replaceVEGExpressions(
      	   availableValues,
      	   noExternalInputs,
	   FALSE,
	   NULL,
	   TRUE); // deep copy because copy constructor made a shallow copy
      partitioningExpression_->synthTypeAndValueId(TRUE);
    }

} // PartitioningFunction::preCodeGen()

const NAString PartitioningFunction::getText() const
{
  CMPABORT;
  return NAString("some type of partitioning function",
                   CmpCommon::statementHeap());
}

void PartitioningFunction::setupForStatement()
{
  if(setupForStatement_)
    return;

  setupForStatement_ = TRUE;
  resetAfterStatement_ = FALSE;
}

void PartitioningFunction::resetAfterStatement()
{
  if(resetAfterStatement_)
    return;

  partitioningKeyColumns_.clear();
  partitioningKeyPredicates_.clear();
  partitionInputValues_.clear();
  partitionInputValuesLayout_.clear();
  partitionSelectionExprInputs_.clear();
  partKeyPredsCreated_=FALSE;
  assignPartition_ = FALSE;
  partitioningExpression_=NULL;
  dataConversionErrorFlag_=NULL;
  resetAfterStatement_ = TRUE;
  setupForStatement_ = FALSE;
}

// -----------------------------------------------------------------------
// Method for debugging
// -----------------------------------------------------------------------
void PartitioningFunction::print(FILE* ofd, const char* indent,
				 const char* title) const
{
  BUMP_INDENT(indent);

  fprintf(ofd,"%s--Partitioning-Function----------------\n",NEW_INDENT);
  fprintf(ofd,"%s%s (%d)\n",
          NEW_INDENT, title, getPartitioningFunctionType());

  fprintf(ofd,"%snumber of partitions (%d)\n",
          NEW_INDENT, getCountOfPartitions());

  if (NOT partitioningKeyColumns_.isEmpty())
    partitioningKeyColumns_.print(ofd,
				  NEW_INDENT,
				  "Partitioning Key Columns");
  if(partKeyPredsCreated())
    fprintf(ofd,"%sPartition Key Predicates Created\n",NEW_INDENT);

  if (NOT partitioningKeyPredicates_.isEmpty())
    partitioningKeyPredicates_.print(ofd,
				     NEW_INDENT,
				     "Partitioning Key Predicates");
  if (NOT partitionInputValues_.isEmpty())
    partitionInputValues_.print(ofd, NEW_INDENT,
				"Partition Input Values");
  if (NOT partitionInputValuesLayout_.isEmpty())
    partitionInputValuesLayout_.print(ofd, NEW_INDENT,
                                      "Partition Input Values Layout");

  if (getPartitioningExpression()) {
    NAString partExpr("Partitioning Expression\n",  CmpCommon::statementHeap());

    getPartitioningExpression()->
      unparse(partExpr, DEFAULT_PHASE, EXPLAIN_FORMAT);

    fprintf(ofd,partExpr);
  }

  if(assignPartition())
    fprintf(ofd,"%sDoes Partition Assignment\n",NEW_INDENT);

  fprintf(ofd,"%s--Partitioning-Function----------------\n",NEW_INDENT);

} // PartitioningFunction::print()

void PartitioningFunction::display() const  { print(); }

// ***********************************************************************
// SinglePartitionPartitioningFunction
// ***********************************************************************

SinglePartitionPartitioningFunction::~SinglePartitionPartitioningFunction() {}

Lng32 SinglePartitionPartitioningFunction::getCountOfPartitions() const
{
  return EXACTLY_ONE_PARTITION;
}

PartitioningRequirement*
SinglePartitionPartitioningFunction::makePartitioningRequirement()
{
  return new (CmpCommon::statementHeap())
             RequireExactlyOnePartition(this);
}

const SinglePartitionPartitioningFunction*
SinglePartitionPartitioningFunction::castToSinglePartitionPartitioningFunction()
  const
{
  return this;
}

PartitioningFunction*
SinglePartitionPartitioningFunction::copy() const
{
  return new (CmpCommon::statementHeap())
    SinglePartitionPartitioningFunction(*this, CmpCommon::statementHeap());
}

void SinglePartitionPartitioningFunction::createPartitioningKeyPredicates()
{
  // do nothing, there aren't any partitioning key preds for a single
  // partition
  storePartitioningKeyPredicates(ValueIdSet());
}

void SinglePartitionPartitioningFunction::replacePivs(
       const ValueIdList& newPivs,
       const ValueIdSet& newPartKeyPreds)
{
  // do nothing, there aren't any pivs for a single
  // partition
}

// -----------------------------------------------------------------------
// SinglePartitionPartitioningFunction::createPartitioningExpression()
// -----------------------------------------------------------------------
ItemExpr*
SinglePartitionPartitioningFunction::createPartitioningExpression()
{
  if (getExpression())      // already constructed?
    return getExpression(); // reuse it!

  // ---------------------------------------------------------------------
  // Construct an expression of the form: ConstValue(0)
  // Allocate ValueIds and type synthesize this partitioning function.
  // ---------------------------------------------------------------------
  ItemExpr * partFunc = new (CmpCommon::statementHeap())
    Cast(new (CmpCommon::statementHeap()) SystemLiteral(0),
	 new (CmpCommon::statementHeap()) SQLInt(CmpCommon::statementHeap(), FALSE,FALSE));

  partFunc->synthTypeAndValueId();
  storeExpression(partFunc);
  return partFunc;
}

NABoolean SinglePartitionPartitioningFunction::isAGroupingOf(
     const PartitioningFunction &other,
     Lng32* maxPartsPerGroup) const
{
  if (maxPartsPerGroup != NULL)
    *maxPartsPerGroup = other.getCountOfPartitions();

  // A single partition is a grouping of about anything. There
  // is only one exception: the replication partitioning "function".
  // Combining the partitions of a replication partitioning
  // function would get us multiple copies of the data which is
  // different from the single partition.
  return ((other.castToReplicateViaBroadcastPartitioningFunction() == NULL) AND
          (other.castToReplicateNoBroadcastPartitioningFunction() == NULL));

}

// -----------------------------------------------------------------------
// SinglePartitionPartitioningFunction::shouldUseSynchronousAccess()
// -----------------------------------------------------------------------
NABoolean SinglePartitionPartitioningFunction::shouldUseSynchronousAccess(
    const ReqdPhysicalProperty* rpp,
    const EstLogPropSharedPtr& inputLogProp,
    GroupAttributes* ga) const
{
  // An unpartitioned table does not need to be accessed synchronously
  // (or asynchronously, for that matter). So, always return FALSE.

  return FALSE;
}


// -----------------------------------------------------------------------
// Push down checking: testing condition C1.2: all tables are not
// partitioned.
// -----------------------------------------------------------------------
NABoolean
SinglePartitionPartitioningFunction::partFuncAndFuncPushDownCompatible(
	const PartitioningFunction& other) const
{
   if ( other.castToSinglePartitionPartitioningFunction() == 0 )
     return FALSE;

   return TRUE;
}

const NAString SinglePartitionPartitioningFunction::getText() const
{
  return "exactly 1 partition";
}

void SinglePartitionPartitioningFunction::print(FILE* ofd, const char* indent,
						const char* title) const
{
  PartitioningFunction::print(ofd, indent,
			      "SinglePartitionPartitioningFunction");
}

// ***********************************************************************
// ReplicateViaBroadcastPartitioningFunction
// ***********************************************************************

ReplicateViaBroadcastPartitioningFunction::
  ~ReplicateViaBroadcastPartitioningFunction()
{}

Lng32 ReplicateViaBroadcastPartitioningFunction::getCountOfPartitions() const
{ return numberOfPartitions_; }

const ReplicateViaBroadcastPartitioningFunction*
ReplicateViaBroadcastPartitioningFunction::
  castToReplicateViaBroadcastPartitioningFunction() const
{ return this; }

PartitioningFunction *
ReplicateViaBroadcastPartitioningFunction::scaleNumberOfPartitions(
     Lng32 &suggestedNewNumberOfPartitions,
     PartitionGroupingDistEnum partGroupDist)
{
  if (suggestedNewNumberOfPartitions == 1)
    return new(CmpCommon::statementHeap())
      SinglePartitionPartitioningFunction();

  // If an expression has been generated, then we want to discard it
  // because it may no longer be correct.
  storeExpression(NULL);

  numberOfPartitions_ = suggestedNewNumberOfPartitions;
  return this;
}

PartitioningRequirement*
ReplicateViaBroadcastPartitioningFunction::makePartitioningRequirement()
{
  return new (CmpCommon::statementHeap())
             RequireReplicateViaBroadcast(this);
}

PartitioningFunction*
ReplicateViaBroadcastPartitioningFunction::copy() const
{
  return new (CmpCommon::statementHeap())
              ReplicateViaBroadcastPartitioningFunction(*this);
}

void ReplicateViaBroadcastPartitioningFunction::
  createPartitioningKeyPredicates()
{
  // Do nothing, the partitioning key preds of a replication "function"
  // are the empty set. By doing nothing we will return all data for
  // each partition.
  storePartitioningKeyPredicates(ValueIdSet());
}

void ReplicateViaBroadcastPartitioningFunction::
       replacePivs(const ValueIdList& newPivs,
                   const ValueIdSet& newPartKeyPreds)
{
  // Do nothing, the pivs of a replication "function"
  // are the empty set.
}

ItemExpr*
ReplicateViaBroadcastPartitioningFunction::createPartitioningExpression()
{
  return NULL;
}

NABoolean ReplicateViaBroadcastPartitioningFunction::isAGroupingOf(
     const PartitioningFunction &other,
     Lng32* maxPartsPerGroup) const
{
  if (maxPartsPerGroup != NULL)
    *maxPartsPerGroup = 1;

  // A replication partitioning function is never a grouping of any
  // other partitioning function than itself. See the definition of a
  // grouping: it is created by merging two partitions of "other"
  // zero or more times.
  return (comparePartFuncToFunc(other) == SAME);
}

const NAString ReplicateViaBroadcastPartitioningFunction::getText() const
{
  char ntimes[20];
  sprintf(ntimes,"%d",numberOfPartitions_);
  return NAString("broadcast ") + ntimes + " times";
}

void ReplicateViaBroadcastPartitioningFunction::
  print(FILE* ofd, const char* indent,
        const char* title) const
{
  PartitioningFunction::
    print(ofd, indent, "ReplicateViaBroadcastPartitioningFunction");
}

// ***********************************************************************
// ReplicateNoBroadcastPartitioningFunction
// ***********************************************************************

ReplicateNoBroadcastPartitioningFunction::
  ~ReplicateNoBroadcastPartitioningFunction()
{}

Lng32 ReplicateNoBroadcastPartitioningFunction::getCountOfPartitions() const
{ return numberOfPartitions_; }

const ReplicateNoBroadcastPartitioningFunction*
ReplicateNoBroadcastPartitioningFunction::
  castToReplicateNoBroadcastPartitioningFunction() const
{ return this; }

PartitioningFunction *
ReplicateNoBroadcastPartitioningFunction::scaleNumberOfPartitions(
     Lng32 &suggestedNewNumberOfPartitions,
     PartitionGroupingDistEnum partGroupDist)
{
  if (suggestedNewNumberOfPartitions == 1)
    return new(CmpCommon::statementHeap())
      SinglePartitionPartitioningFunction();

  // If an expression has been generated, then we want to discard it
  // because it may no longer be correct.
  storeExpression(NULL);

  numberOfPartitions_ = suggestedNewNumberOfPartitions;
  return this;
}

PartitioningRequirement*
ReplicateNoBroadcastPartitioningFunction::makePartitioningRequirement()
{
  return new (CmpCommon::statementHeap())
             RequireReplicateNoBroadcast(this);
}

PartitioningFunction*
ReplicateNoBroadcastPartitioningFunction::copy() const
{
  return new (CmpCommon::statementHeap())
              ReplicateNoBroadcastPartitioningFunction(*this);
}

void ReplicateNoBroadcastPartitioningFunction::
  createPartitioningKeyPredicates()
{
  // Do nothing, the partitioning key preds of a replication "function"
  // are the empty set. By doing nothing we will return all data for
  // each partition.
  storePartitioningKeyPredicates(ValueIdSet());
}

void ReplicateNoBroadcastPartitioningFunction::
       replacePivs(const ValueIdList& newPivs,
                   const ValueIdSet& newPartKeyPreds)
{
  // Do nothing, the pivs of a replication "function"
  // are the empty set.
}

ItemExpr*
ReplicateNoBroadcastPartitioningFunction::createPartitioningExpression()
{
  return NULL;
}

NABoolean ReplicateNoBroadcastPartitioningFunction::isAGroupingOf(
     const PartitioningFunction &other,
     Lng32* maxPartsPerGroup) const
{
  if (maxPartsPerGroup != NULL)
    *maxPartsPerGroup = 1;

  // A replication partitioning function is never a grouping of any
  // other partitioning function than itself. See the definition of a
  // grouping: it is created by merging two partitions of "other"
  // zero or more times.
  return (comparePartFuncToFunc(other) == SAME);
}

const NAString ReplicateNoBroadcastPartitioningFunction::getText() const
{
  char ntimes[20];
  sprintf(ntimes,"%d",numberOfPartitions_);
  return NAString("replicate no broadcast ") + ntimes + " times";
}

void ReplicateNoBroadcastPartitioningFunction::
  print(FILE* ofd, const char* indent,
        const char* title) const
{
  PartitioningFunction::
    print(ofd, indent, "ReplicateNoBroadcastPartitioningFunction");
}

// ***********************************************************************
// HashPartitioningFunction
// ***********************************************************************

HashPartitioningFunction::~HashPartitioningFunction() {}

Lng32 HashPartitioningFunction::getCountOfPartitions() const
                                       { return numberOfHashPartitions_; }

const HashPartitioningFunction*
HashPartitioningFunction::castToHashPartitioningFunction() const
                                                          { return this; }

PartitioningRequirement*
HashPartitioningFunction::makePartitioningRequirement()
{
  return new (CmpCommon::statementHeap())
             RequireHash(this);
}

PartitioningFunction*
HashPartitioningFunction::copy() const
{
  return new (CmpCommon::statementHeap())
    HashPartitioningFunction(*this, CmpCommon::statementHeap());
}

// -----------------------------------------------------------------------
// PartitioningFunction::createBetweenPartitioningKeyPredicates()
// -----------------------------------------------------------------------
void PartitioningFunction::createBetweenPartitioningKeyPredicates(
       const char * pivLoName,
       const char * pivHiName,
       ItemExpr   * partNumExpr,
       NABoolean    useHash2Split)
{
  if (NOT partKeyPredsCreated() || partNumExpr != NULL)
    {
      ItemExpr * rootPtr;
      ItemExpr * loPart;
      ItemExpr * hiPart;
      ValueIdSet setOfPartKeyPredicates;

      // by default we use the partitioning function's expression
      // to compute the partition number
      if (partNumExpr == NULL)
        partNumExpr = createPartitioningExpression();

      // compute part input values if not already done so
      if (partitionInputValues_.isEmpty())
        {
          ValueIdList partInputValues;

          // must specify PIV names if they need to be created
          CMPASSERT(pivLoName && pivHiName);

          // the partition input values are two integer values: lo and hi part #
          loPart = new (CmpCommon::statementHeap())
            HostVar(pivLoName,
                    new (CmpCommon::statementHeap()) SQLInt(CmpCommon::statementHeap(), FALSE,FALSE),
                    TRUE);
          hiPart = new (CmpCommon::statementHeap())
            HostVar(pivHiName,
                    new (CmpCommon::statementHeap()) SQLInt(CmpCommon::statementHeap(), FALSE,FALSE),
                    TRUE);
          loPart->synthTypeAndValueId();
          hiPart->synthTypeAndValueId();
          partInputValues.insert(loPart->getValueId());
          partInputValues.insert(hiPart->getValueId());
          storePartitionInputValues(partInputValues);
        }
      else
        {
          loPart = getPartitionInputValuesLayout()[0].getItemExpr();
          hiPart = getPartitionInputValuesLayout()[1].getItemExpr();
        }

      // -----------------------------------------------------------------
      // The partitioning key predicate for a hash partitioning selects
      // a range of hash partitions with the following predicate:
      //       partNumExpr >= :loPart AND partNumExpr < :hiPart
      // where the hash function is the partitioning function generated
      // by the createPartitioningExpression() method and the host variables
      // are generated here as partition input values.
      // -----------------------------------------------------------------

      if (useHash2Split)
        {
          // For a HASH2 function, the PIVs are expressed as min and max
          // hash values to be retrieved. These need to be converted to
          // partition numbers when we use them here. Note that we need to
          // use the original number of HASH2 partitions here. For example,
          // if we have a table salted with 64 salt buckets and we use
          // 8 ESPs (8 partitions), the predicate needs to select 8 different
          // salt values (original partitions) for each ESP.
          CMPASSERT(isAHash2PartitioningFunction());
          UInt32 numOfOrigPartns = castToHash2PartitioningFunction()->
            getCountOfOrigHashPartitions();
          char numPartsString[30];

          snprintf(numPartsString,sizeof(numPartsString),"%d",numOfOrigPartns);
          NAString numPartsLiteral(numPartsString);
          ConstValue *numPartns = new (CmpCommon::statementHeap())
            ConstValue(new (CmpCommon::statementHeap())
                         SQLInt(CmpCommon::statementHeap(), FALSE,
                                FALSE),
                       (void *) &numOfOrigPartns,
                       (Lng32) sizeof(numOfOrigPartns),
                       &numPartsLiteral,
                       CmpCommon::statementHeap());

          loPart = new (CmpCommon::statementHeap())
            Hash2Distrib(loPart, numPartns);
          hiPart = new (CmpCommon::statementHeap())
            Hash2Distrib(hiPart, numPartns);
        }

      // lower bound
      rootPtr = new (CmpCommon::statementHeap())
        BiRelat(ITM_GREATER_EQ,
                partNumExpr,
                loPart,
                TRUE);
      rootPtr->synthTypeAndValueId();
      setOfPartKeyPredicates += rootPtr->getValueId();

      // upper bound
      rootPtr = new (CmpCommon::statementHeap())
        BiRelat((useHash2Split ? ITM_LESS_EQ : ITM_LESS),
                partNumExpr,
                hiPart,
                TRUE);
      rootPtr->synthTypeAndValueId();
      setOfPartKeyPredicates += rootPtr->getValueId();

      // Store the set of key predicates in the partitioning attributes.
      storePartitioningKeyPredicates(setOfPartKeyPredicates);
    }
} // PartitioningFunction::createBetweenPartitioningKeyPredicates()

void HashPartitioningFunction::createPartitioningKeyPredicates()
{
  createBetweenPartitioningKeyPredicates(
       "_sys_HostVarLoHashPart",
       "_sys_HostVarHiHashPart");
}

// -----------------------------------------------------------------------
// HashPartitioningFunction::replacePivs()
// -----------------------------------------------------------------------
void HashPartitioningFunction::replacePivs(
       const ValueIdList& newPivs,
       const ValueIdSet& newPartKeyPreds)
{
  // Overwrite the old pivs, part key preds, and part expr. with the new ones.
  storePartitionInputValues(newPivs);
  storePartitioningKeyPredicates(newPartKeyPreds);
} // HashPartitioningFunction::replacePivs()

COMPARE_RESULT
HashPartitioningFunction::comparePartFuncToFunc
                               (const PartitioningFunction &other) const
{
  COMPARE_RESULT c = PartitioningFunction::comparePartFuncToFunc(other);

  if (c != SAME)
    return INCOMPATIBLE;

  return comparePartKeyToKey(other);
}

COMPARE_RESULT
HashPartitioningFunction::comparePartKeyToKey
                               (const PartitioningFunction &other) const
{
  if (getPartitioningFunctionType() != other.getPartitioningFunctionType()) 
    return INCOMPATIBLE;

  const HashPartitioningFunction &oth =
    (const HashPartitioningFunction &) other;

  if (keyColumnList_.entries() != oth.keyColumnList_.entries())
    return INCOMPATIBLE;

  // compare the key columns and their order
  for (CollIndex i = 0; i < keyColumnList_.entries(); i++)
  {
    if (keyColumnList_[i] != oth.keyColumnList_[i])
      return INCOMPATIBLE;

    if (  NOT (originalKeyColumnList_[i].getType() ==
               oth.originalKeyColumnList_[i].getType() )
          )
      return INCOMPATIBLE;
  }

  return SAME;
}

// -----------------------------------------------------------------------
// HashPartitioningFunction::isAGroupingOf()
// -----------------------------------------------------------------------
  // Right now we assume that the split function of a hash partitioning
  // function is such that no two functions are a grouping of each other.
  // The only exception is identity, of course. We use the base class'
  // implementation.
  //
  // With more knowledge about the split function (e.g. by knowing it's
  // a simple modulus), one could guarantee that a 4-way hash-partitioning
  // scheme is actually a grouping of an 8-way scheme. This is not really
  // necessary and therefore not done here.

// -----------------------------------------------------------------------
// HashPartitioningFunction::createPartitioningExpression()
// -----------------------------------------------------------------------
ItemExpr* HashPartitioningFunction::createPartitioningExpression()
{
  if (getExpression())      // already constructed?
    return getExpression(); // reuse it!

  // ---------------------------------------------------------------------
  // Construct an expression of the form:
  //
  //                            Modulus
  //                            /     \
  //                           /       \
  //                        Hash     <count of partitions>
  //                         |
  //      (narrow(key1), narrow(key2), ..., narrow(keyN))
  //
  // Allocate ValueIds and type synthesize this partitioning function.
  // ---------------------------------------------------------------------
  ValueIdList typedKeyCols;

  for (CollIndex i = 0; i < keyColumnList_.entries(); i++)
    {
      // cast the key column to the exact type of the original key column
      const NAType &oType = originalKeyColumnList_[i].getType();

      ItemExpr *c = getCastedItemExpre(keyColumnList_[i].getItemExpr(), oType, 
                                CmpCommon::statementHeap());

      c->synthTypeAndValueId();
      typedKeyCols.insert(c->getValueId());
    }

  ItemExpr * partFunc =
    new (CmpCommon::statementHeap())
         Modulus(
                 buildHashingExpressionForExpr(
                      typedKeyCols.rebuildExprTree(ITM_ITEM_LIST)
                                              ),
	         //     Hash(typedKeyCols.rebuildExprTree(ITM_ITEM_LIST)),
	         new (CmpCommon::statementHeap())
	              SystemLiteral(getCountOfPartitions()));

  // once we support late binding, and/or changing the number of target
  // partitions at run time, the number of partitions will have
  // to become some sort of an input value $$$$

  partFunc->synthTypeAndValueId();
  storeExpression(partFunc);
  return partFunc;
} // HashPartitioningFunction::createPartitioningExpression()

ItemExpr *
HashPartitioningFunction::buildHashingExpressionForExpr(ItemExpr* expr) const
{
  return new (CmpCommon::statementHeap()) Hash(expr);
}

UInt32 HashPartitioningFunction::computeHashValue(char* data, UInt32 flags, Int32 len) 
{
  // Directly call the implementation function to compute the hash. NULL
  // values and VARCHAR data types are not handled.
  return FastHash(data, len);
}

ItemExpr * HashPartitioningFunction::getHashingExpression() const
{
  ItemExpr* hashExpr = NULL;
  ItemExpr* partExpr = getExpression();
  if ( partExpr ) {
    hashExpr = partExpr->child(0);

    CMPASSERT(hashExpr AND
              hashExpr->getOperatorType()== ITM_HASH);
  }
  return hashExpr;
}

// -----------------------------------------------------------------------
// HashPartitioningFunction::remapIt()
// -----------------------------------------------------------------------
void HashPartitioningFunction::remapIt
                                  (const PartitioningFunction* opf,
			           ValueIdMap& map, NABoolean mapItUp)
{
  PartitioningFunction::remapIt(opf, map,mapItUp);

  // If we have arrived here, the original partitioning function (*opf)
  // MUST be a HashPartitioningFunction().
  CMPASSERT(opf->castToHashPartitioningFunction());

  // Clear because rewrite insists on it being so.
  keyColumnList_.clear();

  if (mapItUp)
    {
      map.rewriteValueIdListUp(
	   keyColumnList_,
	   opf->castToHashPartitioningFunction()->keyColumnList_);
    }
  else
    {
      map.rewriteValueIdListDown(
	   opf->castToHashPartitioningFunction()->keyColumnList_,
	   keyColumnList_);
    }

  // do NOT map the originalKeyColumnList_, that's why it's called ORIGINAL

} // HashPartitioningFunction::remapIt()

PartitioningFunction * HashPartitioningFunction::scaleNumberOfPartitions(
     Lng32 &suggestedNewNumberOfPartitions,
     PartitionGroupingDistEnum partGroupDist)
{
  if (suggestedNewNumberOfPartitions == 1)
    return new(CmpCommon::statementHeap())
      SinglePartitionPartitioningFunction();

  // If an expression has been generated, then we want to discard it
  // because it may no longer be correct.
  storeExpression(NULL);

  if ( suggestedNewNumberOfPartitions != getCountOfPartitions() ) {
     NodeMap* newNodeMap =
       new (heap_) NodeMap(heap_, suggestedNewNumberOfPartitions);
     replaceNodeMap(newNodeMap);
  }

  numberOfHashPartitions_ = suggestedNewNumberOfPartitions;
  return this;
}

// -----------------------------------------------------------------------
// HashPartitioningFunction::preCodeGen()
// Rewrite the partitioning keys of the partitioning function that
// are expressed using VEGReferences in terms of the available values.
// -----------------------------------------------------------------------
void HashPartitioningFunction::preCodeGen(const ValueIdSet& availableValues)
{
  ValueIdSet noExternalInputs;
  PartitioningFunction::preCodeGen(availableValues);
  keyColumnList_.replaceVEGExpressions(availableValues, noExternalInputs, 
                                        FALSE, NULL, TRUE);
} // HashPartitioningFunction::preCodeGen()

// -----------------------------------------------------------------------
// Method for debugging.
// -----------------------------------------------------------------------
const NAString HashPartitioningFunction::getTextImp(const char* hashType) const
{
  char nparts[20];
  NAString result(hashType, CmpCommon::statementHeap());
  result.append(" partitioned ");
  
  sprintf(nparts,"%d",numberOfHashPartitions_);
  result += nparts;
  result += " ways on (";
  //getPartitioningKey().unparse(result,DEFAULT_PHASE,EXPLAIN_FORMAT);
  getKeyColumnList().unparse(result,DEFAULT_PHASE,EXPLAIN_FORMAT);
  result += ")";
  if (result.contains("randomNum"))
   result = NAString("round robin partitioned");
  return result;
}

const NAString HashPartitioningFunction::getText() const
{
   return getTextImp("hash"); 
}

void HashPartitioningFunction::print(FILE* ofd, const char* indent,
	   			     const char* title) const
{
  PartitioningFunction::print(ofd, indent, "HashPartitioningFunction");
} // HashPartitioningFunction::print()

// Return an expression casting an encoded skew value to oType.
static 
ItemExpr* 
getCastedSkewValueExpr(const EncodedValue& ev, const NAType& oType, CollHeap* heap) 
{
  double x = ev.getDblValue();
  return new (heap) Cast(
       new (heap) ConstValue(
             new (heap) SQLDoublePrecision(heap, FALSE /* no SQL NULL*/), 
             (char*)&x, sizeof(x)
                            ), 
       &oType
                        );
}

// ***********************************************************************
// TableHashPartitioningFunction 
//        - The externalized Hash Partitioning functions.
// ***********************************************************************

// -----------------------------------------------------------------------
// TableHashPartitioningFunction Destructor
// -----------------------------------------------------------------------
TableHashPartitioningFunction::~TableHashPartitioningFunction() {}

void TableHashPartitioningFunction::setupForStatement()
{
  if(setupForStatement_)
    return;

  PartitioningFunction::setupForStatement();

  setupForStatement_ = TRUE;
  resetAfterStatement_ = FALSE;
}

void TableHashPartitioningFunction::resetAfterStatement()
{
  if(resetAfterStatement_)
    return;

  PartitioningFunction::resetAfterStatement();
  keyColumnList_.clear();
  originalKeyColumnList_.clear();
  numberOfPartitions_ = numberOfOrigHashPartitions_;

  setupForStatement_ = FALSE;
  resetAfterStatement_ = TRUE;
}

// -----------------------------------------------------------------------
// TableHashPartitioningFunction Safe down cast.
// -----------------------------------------------------------------------
const TableHashPartitioningFunction*
TableHashPartitioningFunction::castToTableHashPartitioningFunction() const
{
  return this;
}

Lng32
TableHashPartitioningFunction::getCountOfPartitions() const
{
  return numberOfPartitions_;
}

PartitioningRequirement*
TableHashPartitioningFunction::makePartitioningRequirement()
{
  // Redefine PartitioningFunction::makePartitioningRequirement()
  CMPABORT;
  return NULL;
}


// -----------------------------------------------------------------------
// -----------------------------------------------------------------------
void
TableHashPartitioningFunction::createPartitionSelectionExprInputs()
{
  if (partitionSelectionExprInputs().entries() != 0)
    return;

  CollHeap *heap = CmpCommon::statementHeap();

  // Use a host var to provide access to numParts, this will be
  // mapped to a specific ATP:ATPIndex:offset in PartitionAccess::codeGen()
  //
  char hvFabricatedName[50];
  sprintf(hvFabricatedName, "_sys_hostVarNumParts_%p", this);
  ItemExpr *numParts = new (heap)
        HostVar(hvFabricatedName,
                // int not null
                new (heap) SQLInt(CmpCommon::statementHeap(), FALSE, FALSE),
                // is system-supplied
                TRUE);
  numParts->synthTypeAndValueId();

  // Use a host var to provide access to partNum, this will be
  // mapped to a specific ATP:ATPIndex:offset in PartitionAccess::codeGen()
  //
  sprintf(hvFabricatedName, "_sys_hostVarPartNo_%p", this);
  ItemExpr *partNum = new (heap)
        HostVar(hvFabricatedName,
                // int not null
                new (heap) SQLInt(CmpCommon::statementHeap(), FALSE, FALSE),
                // is system-supplied
                TRUE);
  partNum->synthTypeAndValueId();

  // Record these hostvars as the inputs to the partitionSelectionExpr.
  //
  partitionSelectionExprInputs().insert(partNum->getValueId());
  partitionSelectionExprInputs().insert(numParts->getValueId());
}

// -----------------------------------------------------------------------
// TableHashPartitioningFunction::normalizePartitioningKeys()
// Rewrite the partitioning keys of the partitioning function in
// terms of the VEGReference for the VEG to which the partitioning
// key column belongs.
// -----------------------------------------------------------------------
void
TableHashPartitioningFunction::normalizePartitioningKeys(NormWA& normWARef)
{
  PartitioningFunction::normalizePartitioningKeys(normWARef);
  keyColumnList_.normalizeNode(normWARef);

  // don't normalize original key col list, avoid VEGies which could
  // cause data type changes.
}

// -----------------------------------------------------------------------
// TableHashPartitioningFunction::createPartitioningKeyPredicates()
// Since hash dist partitioning can not create any Key Predicates,
// this method simply creates the partition input values (PIVs).
// -----------------------------------------------------------------------
void TableHashPartitioningFunction::createPartitioningKeyPredicates()
{
  if (NOT partKeyPredsCreated())
    createBetweenPartitioningKeyPredicates("_sys_HostVarLoHashPart",
                                           "_sys_HostVarHiHashPart");

  // Create the partition selection input values (needed by hash2).
  createPartitionSelectionExprInputs();
} // TableHashPartitioningFunction::createPartitioningKeyPredicates()

// -----------------------------------------------------------------------
// For salted tables, since we store the hash partition value, we
// can generate a range predicate for the _SALT_ column
// -----------------------------------------------------------------------
void TableHashPartitioningFunction::createPartitioningKeyPredicatesForSaltedTable(
     ValueId saltCol)
{
  // For now we only allow this call after calling the regular
  // createPartitioningKeyPredicates() method
  CMPASSERT(partKeyPredsCreated());
  
  // this allows us to specify NULL for the names here
  createBetweenPartitioningKeyPredicates(NULL,
                                         NULL,
                                         saltCol.getItemExpr(),
                                         isAHash2PartitioningFunction());

  // and we don't need to call this
  // createPartitionSelectionExprInputs();
} // TableHashPartitioningFunction::createPartitioningKeyPredicates()

// -----------------------------------------------------------------------
// TableHashPartitioningFunction::replacePivs()
// -----------------------------------------------------------------------
void TableHashPartitioningFunction::replacePivs(
       const ValueIdList& newPivs,
       const ValueIdSet& newPartKeyPreds)
{
  // Overwrite the old pivs, part key preds, and part expr. with the new ones.
  storePartitionInputValues(newPivs);
  storePartitioningKeyPredicates(newPartKeyPreds);
} // TableHashPartitioningFunction::replacePivs()

PartitioningFunction*
TableHashPartitioningFunction::createPartitioningFunctionForIndexDesc
                         (IndexDesc *idesc) const
{
  // Redefine PartitioningFunction::createPartitioningFunctionForIndexDesc()
  CMPABORT;
  return NULL;
}

// -----------------------------------------------------------------------
// TableHashPartitioningFunction::remapIt()
// -----------------------------------------------------------------------
void
TableHashPartitioningFunction::remapIt(const PartitioningFunction* opf,
                                      ValueIdMap& map,
                                      NABoolean mapItUp)
{
  PartitioningFunction::remapIt(opf, map,mapItUp);

  // If we have arrived here, the original partitioning function (*opf)
  // MUST be a TableHashPartitioningFunction().
  //
  const TableHashPartitioningFunction *oth =
    opf->castToTableHashPartitioningFunction();

  CMPASSERT(oth);

  // Clear because rewrite insists on it being so.
  //
  keyColumnList_.clear();

  if (mapItUp)
    map.rewriteValueIdListUp(keyColumnList_, oth->keyColumnList_);
  else
    map.rewriteValueIdListDown(oth->keyColumnList_, keyColumnList_);

  // do NOT map the originalKeyColumnList_, that's why it's called ORIGINAL

} // TableHashPartitioningFunction::remapIt()

// -----------------------------------------------------------------------
// TableHashPartitioningFunction::preCodeGen()
// Rewrite the partitioning keys of the partitioning function that
// are expressed using VEGReferences in terms of the available values.
// -----------------------------------------------------------------------
void
TableHashPartitioningFunction::preCodeGen(const ValueIdSet& availableValues)
{
  ValueIdSet noExternalInputs;

  PartitioningFunction::preCodeGen(availableValues);

  keyColumnList_.replaceVEGExpressions(availableValues, noExternalInputs, 
                                        FALSE, NULL, TRUE);

} // TableHashPartitioningFunction::preCodeGen()

// -----------------------------------------------------------------------
// TableHashPartitioningFunction::createPartitioningExpression()
// -----------------------------------------------------------------------
ItemExpr* TableHashPartitioningFunction::createPartitioningExpression()
{
   return TableHashPartitioningFunction::createPartitioningExpressionImp(FALSE);
} // TableHashPartitioningFunction::createPartitioningExpression()

ItemExpr* 
TableHashPartitioningFunction::createPartitioningExpressionImp(NABoolean doVarCharCast)
{
  // already constructed with the same doVarCharCast argument?
  if (getExpression() && doVarCharCast == doVarCharCast_ )
    return getExpression(); // reuse it!

  doVarCharCast_ = doVarCharCast;

  ValueIdList typedKeyCols;

  const ValueIdList &keyColumnList = getKeyColumnList();
  const ValueIdList &originalKeyColumnList = getOriginalKeyColumnList();

  CollHeap *heap = CmpCommon::statementHeap();

  for (CollIndex i = 0; i < keyColumnList.entries(); i++)
    {
      const NAType &oType = originalKeyColumnList[i].getType();
      ItemExpr *c;
      ItemExpr *dataConversionErrorFlag = getConvErrorExpr();
      if (dataConversionErrorFlag == 0)
        {
          dataConversionErrorFlag =
            new (heap) HostVar("_sys_repartConvErrorFlg",
                               new (heap) SQLInt(heap, TRUE,FALSE),
                               TRUE);
          storeConvErrorExpr(dataConversionErrorFlag);
        }

      // Note that we always generate a Narrow operator here, even
      // if it's not necessary. Because keyColumnList[i] may be a
      // VEGReference it may not be possible to determine its
      // exact type at this time. We could eliminate some cases of
      // Narrow in the preCodeGen phase but this is not done yet.

      NAType* finalTypePtr = oType.newCopy(heap);

      if ( oType.getTypeQualifier() == NA_CHARACTER_TYPE && 
           doVarCharCast == TRUE ) {

         ValueId vId = originalKeyColumnList[i];
         ValueIdSet vidSet;
   
         CharType *maxCharType = NULL;

         switch (vId.getItemExpr()->getOperatorType()) {
           case ITM_VEG_PREDICATE:
           case ITM_VEG_REFERENCE:
             {
              vId.getItemExpr()->findAll(ITM_BASECOLUMN, vidSet, TRUE, TRUE);
              for (ValueId x=vidSet.init(); vidSet.next(x); vidSet.advance(x)) 
              {
                const CharType &ctype = (CharType&)(x.getType());

                if ( maxCharType == NULL OR 
                     maxCharType->getNominalSize() < ctype.getNominalSize()   OR
                    (maxCharType->getNominalSize() == ctype.getNominalSize()  AND
                     maxCharType->getStrCharLimit() < ctype.getStrCharLimit())
                   )
                    maxCharType = &(CharType&)x.getType() ;
              }
              finalTypePtr = maxCharType->equivalentVarCharType(heap);
             }

             break;
   
           case ITM_INDEXCOLUMN:
           case ITM_BASECOLUMN:
             // For skewbuster+OCR nested join, the partition key column 
             // is not a VEG or VEGref since the equality predicates have 
             // been pushed down to the inner table. Here we cast the 
             // column type to equivalent VarCHAR, regardless of the length
             // of the other column in the join predicate. The optimization
             // works fine because we just want to trim the trailing 
             // spaces and then hash the trimmed value. 
             finalTypePtr = ((CharType&)(vId.getType())).equivalentVarCharType(heap);
             break;

           default:
             break;
         }
      }

      // leave the statement heap to delete the SQLVarChar object, if created
      c = new (heap) Narrow(keyColumnList[i].getItemExpr(),
                            dataConversionErrorFlag,
                            finalTypePtr, 
                            ITM_NARROW,
                            FALSE /* reverseDataErrorConversionFlag */,
                            TRUE /* match child nullability */);

      c->synthTypeAndValueId();

      typedKeyCols.insert(c->getValueId());
    }

  ItemExpr * partFunc = buildPartitioningExpression(typedKeyCols);

#if 0
  //===================================================================
  // Test the classification expression
  //===================================================================
  ItemExpr* isSkewExpr = createClassificationExpressionForSkewedValues();

  if ( isSkewExpr ) {
     ValueIdList exprs;
     exprs.insert(isSkewExpr->getValueId());

     char resultBuf[2000];
     ex_expr::exp_return_type evalReturnCode = exprs.evalAtCompileTime
     (0, ExpTupleDesc::SQLARK_EXPLODED_FORMAT, resultBuf, 2000);
  }
#endif

  // once we support late binding, and/or changing the number of target
  // partitions at run time, the number of partitions will have
  // to become some sort of an input value $$$$
  partFunc->synthTypeAndValueId();
  storeExpression(partFunc);

#if 0 // test 
   buildHashListForSkewedValues();
#endif

  return partFunc;


} // TableHashPartitioningFunction::createPartitioningExpressionImp()

// -----------------------------------------------------------------------
// TableHashPartitioningFunction::createPartitionSelectionExpr()
// Create the partition selection expression.  'Partition selection'
// means that an expression is used to determine the partition to
// access as opposed to using the File System to determine the range
// of partitions to access based on a set of partitioning key
// predicates. Partition selection is currently used for Hash Dist and
// Round Robin Partitioning. And the File System is used for Range
// Partitioning.  If a partitioning selection expression is created,
// it is cached in the data member 'partitionSelectionExpr_' and the
// partition selection inputs are generated and stored in
// 'partitionSelectionExprInputs_'. This method is redefined for
// TableHashPartitioningFunction and RoundRobinPartitioningFunction.
// -----------------------------------------------------------------------
ItemExpr *
TableHashPartitioningFunction::
createPartitionSelectionExpr(const SearchKey *partSearchKey,
                             const ValueIdSet &availableValues)
{

  // If it has already been created, return cached version.
  //
  if(partitionSelectionExpr())
    return partitionSelectionExpr();

  // For now, only support a partition selection expression when the
  // partition search key is unique (identifies exactly one partition).
  //
  if(!partSearchKey->isUnique()) {
    return NULL;
  } else {

    CollHeap *heap = CmpCommon::statementHeap();
    const ValueIdList &keyColumns =  partSearchKey->getKeyColumns();
    const ValueIdList &keyValues =  partSearchKey->getBeginKeyValues();
    ValueIdList newValues;

    createPartitionSelectionExprInputs();
    const ValueIdList &partSelExprInputs = partitionSelectionExprInputs();
    ItemExpr *numParts = partSelExprInputs[1].getItemExpr();

    // Construct the list of key values.  Make sure that they are cast
    // to the exact type of the original key columns since the Hash
    // function is sensitive to the type of its inputs.  Must use the
    // original key columns, since the keyColumnList may have been
    // changed.
    //
    ValueIdList typedKeyCols;
    const ValueIdList &originalKeyColumnList = getOriginalKeyColumnList();

    for (CollIndex i = 0; i < keyColumns.entries(); i++)
      {
        // cast the key value to the exact type of the original key column
        const NAType &oType = originalKeyColumnList[i].getType();

        ItemExpr *c;
        ItemExpr *dataConversionErrorFlag = getConvErrorExpr();
        if (dataConversionErrorFlag == 0)
          {
            dataConversionErrorFlag =
              new (heap) HostVar("_sys_repartConvErrorFlg",
                                 new (heap) SQLInt(heap, TRUE,FALSE),
                                 TRUE);
            storeConvErrorExpr(dataConversionErrorFlag);
          }

        // Note that we always generate a Narrow operator here, even
        // if it's not necessary. Because keyColumnList[i] may be a
        // VEGReference it may not be possible to determine its
        // exact type at this time. We could eliminate some cases of
        // Narrow in the preCodeGen phase but this is not done yet.
        //
        NAType * finalTypePtr = oType.newCopy(heap);
        c = new (heap) Narrow(keyValues[i].getItemExpr(),
                              dataConversionErrorFlag,
                              finalTypePtr);
        c->synthTypeAndValueId();
        typedKeyCols.insert(c->getValueId());
      }

    // Construct and cache the partitionSelectionExpr for Table Hash.
    //
    partitionSelectionExpr() =
      buildPartitioningSelectionExpr(typedKeyCols, numParts);

    // Bind the expression
    //
    partitionSelectionExpr()->synthTypeAndValueId();

    // PreCodeGen the expression (This maybe should go in
    // TableHashPartitioningFunction::preCodeGen(), but preCodeGen() is
    // typically called before this expression is generated.
    //
    partitionSelectionExpr()->replaceVEGExpressions(availableValues,
                                                    availableValues);

    return partitionSelectionExpr();
  }
} // TableHashPartitioningFunction::createPartitionSelectionExpr()

// -----------------------------------------------------------------------
// Make a new partSearchKey that indicates that PA_PARTITION_GROUPING
// is being done.  Note that a search key can not be generated which
// can group hashed partitions.  For TableHashPartitioning, a flag in
// the search key is used to indicate that PA_PARTITION_GROUPING is
// being done and the begin/end key values of the search key are set
// to the partition input values of the partitioning function.
// -----------------------------------------------------------------------
SearchKey *
TableHashPartitioningFunction::createSearchKey(const IndexDesc *indexDesc,
                                               ValueIdSet availInputs,
                                               ValueIdSet additionalPreds) const
{
  ValueIdSet preds(getPartitioningKeyPredicates());
  ValueIdSet nonKeyColumnSet; // empty set
  SearchKey *partSearchKey = NULL;

  availInputs += getPartitionInputValues();
  preds += additionalPreds;

  if (indexDesc->getPrimaryTableDesc()->getNATable()->isHbaseTable())
    {
      // The HbaseAccess executor operator doesn't have a separate
      // place to handle partitioning key predicates. Instead, we can
      // only use them as regular key or executor predicates.
      // For salted tables, we have a chance to read a range of salt
      // values through a begin/end key.
      partSearchKey = new (CmpCommon::statementHeap())
        SearchKey(indexDesc->getIndexKey(),
                  indexDesc->getOrderOfKeyValues(),
                  availInputs,
                  TRUE,
                  preds,
                  nonKeyColumnSet,
                  indexDesc);
    }
  else
    {
      // Call this special constructor that constructs a search key for a
      // TableHashPartitioningFunction.
      //
      partSearchKey = new (CmpCommon::statementHeap())
        SearchKey(indexDesc->getPartitioningKey(),
                  indexDesc->getOrderOfPartitioningKeyValues(),
                  availInputs,
                  preds,
                  this,
                  nonKeyColumnSet,
                  indexDesc);
    }

  return partSearchKey;
} // TableHashPartitioningFunction::createSearchKey()


ItemExpr *
TableHashPartitioningFunction::buildHashingExpressionForExpr(ItemExpr* expr) 
const
{
   CollHeap *heap = CmpCommon::statementHeap();
   return new (heap) HashDistPartHash(expr);
}

UInt32 TableHashPartitioningFunction::computeHashValue(char* data, UInt32 flags, Int32 len) 
{
  // Directly call the implementation function to compute the hash. NULL
  // values and VARCHAR data types are not handled.
  return ExHDPHash::hash(data, flags, len);
}

ItemExpr * TableHashPartitioningFunction::getHashingExpression() const
{
  ItemExpr* hashExpr = NULL;
  ItemExpr* partExpr = getExpression();
  if ( partExpr ) {
    hashExpr = partExpr->child(0);

    CMPASSERT(hashExpr);

    if ( hashExpr->getOperatorType() == ITM_PROGDISTRIB ) {
      hashExpr = hashExpr->child(0);
      CMPASSERT(hashExpr);
    }

    CMPASSERT ( hashExpr->getOperatorType() == ITM_HDPHASH);
  }
  return hashExpr;
}

// -----------------------------------------------------------------------
// HashDistPartitioningFunction Destructor
// -----------------------------------------------------------------------
HashDistPartitioningFunction::~HashDistPartitioningFunction() {}

// -----------------------------------------------------------------------
// HashDistPartitioningFunction Safe down cast.
// -----------------------------------------------------------------------
const HashDistPartitioningFunction*
HashDistPartitioningFunction::castToHashDistPartitioningFunction() const
{
  return this;
}

PartitioningRequirement*
HashDistPartitioningFunction::makePartitioningRequirement()
{
  return new (CmpCommon::statementHeap())
    RequireHashDist(this);
}

PartitioningFunction*
HashDistPartitioningFunction::copy() const
{
  return new (CmpCommon::statementHeap()) HashDistPartitioningFunction(*this);
}

// -----------------------------------------------------------------------
// Method for debugging.
// -----------------------------------------------------------------------
const NAString HashDistPartitioningFunction::getText() const
{
  NAString result("hash1 partitioned ");
 
  char nparts[40];
  sprintf(nparts,"%d (%d) ways on (", numberOfPartitions_,
          numberOfOrigHashPartitions_);
  result += nparts;

  getKeyColumnList().unparse(result,DEFAULT_PHASE,EXPLAIN_FORMAT);
  result += ")";

  return result;
}

void HashDistPartitioningFunction::print(FILE* ofd, const char* indent,
                                         const char* title) const
{
  PartitioningFunction::print(ofd, indent, "HashDistPartitioningFunction");
} // TableHashPartitioningFunction::print()

// -----------------------------------------------------------------------
// HashDistPartitioningFunction::createPartitioningFunctionForIndexDesc()
// -----------------------------------------------------------------------
PartitioningFunction*
HashDistPartitioningFunction::
createPartitioningFunctionForIndexDesc(IndexDesc *idesc) const
{
  const NAFileSet * fileSet = idesc->getNAFileSet();
  const NAColumnArray & allColumns = fileSet->getAllColumns();
  const NAColumnArray & partKeyColumns = fileSet->getPartitioningKeyColumns();

  CollIndex ixColNumber;
  ValueId keyValueId;
  ValueIdSet partitioningKey;
  ValueIdList partitioningKeyList;

  for (CollIndex i = 0; i < partKeyColumns.entries(); i++)
    {
      // which column of the index is this (usually this will be == i)
      ixColNumber = allColumns.index(partKeyColumns[i]);

      // insert the value id of the index column into the partitioning
      // key column value id set
      keyValueId = idesc->getIndexColumns()[ixColNumber];
      partitioningKey += keyValueId;
      partitioningKeyList.insertAt(i,keyValueId);
    } // end loop over partitioning key columns

  // -----------------------------------------------------------------
  // Allocate a new HashPartitioningFunction.
  // -----------------------------------------------------------------
  HashDistPartitioningFunction *partFunc = new(idesc->wHeap())
    HashDistPartitioningFunction (partitioningKey,
                                  partitioningKeyList,
                                  getCountOfPartitions(),
                                  getNodeMap()->copy(idesc->wHeap()));

  // -----------------------------------------------------------------
  // Construct the partitioning key predicates.
  // -----------------------------------------------------------------
  partFunc->createPartitioningKeyPredicates();

  return partFunc;

} // HashDistPartitioningFunction::createPartitioningFunctionForIndexDesc()


// -----------------------------------------------------------------------
// HashDistPartitioningFunction::comparePartFuncToFunc(): Compare this
// partitioning function to another hash dist function. To be 'SAME'
// must have the same number and order of partitioning key columns and
// have the same number of partitions (scaled and original).
// -----------------------------------------------------------------------
COMPARE_RESULT
HashDistPartitioningFunction::
comparePartFuncToFunc(const PartitioningFunction &other) const
{
  COMPARE_RESULT c = PartitioningFunction::comparePartFuncToFunc(other);

  if (c != SAME)
    return INCOMPATIBLE;

  const HashDistPartitioningFunction *oth =
    other.castToHashDistPartitioningFunction();

  // Since they compared 'SAME', oth should always exist, so this
  // test is redundant.
  //
  if(!oth)
    return INCOMPATIBLE;

  // They must be based on the same physical partitioning.
  //
  if (getCountOfOrigHashPartitions() != oth->getCountOfOrigHashPartitions())
    return INCOMPATIBLE;

  // Make sure that the keys are in the same order.
  //
  if (keyColumnList_.entries() != oth->keyColumnList_.entries())
    return INCOMPATIBLE;

  for (CollIndex i = 0; i < keyColumnList_.entries(); i++)
  {
    if (keyColumnList_[i] != oth->keyColumnList_[i])
      return INCOMPATIBLE;

    if (  NOT (originalKeyColumnList_[i].getType() ==
         oth->originalKeyColumnList_[i].getType() )
       )
      return INCOMPATIBLE;
  }

  return SAME;
} // HashDistPartitioningFunction::comparePartFuncToFunc()


PartitioningFunction *
HashDistPartitioningFunction::
scaleNumberOfPartitions(Lng32 &suggestedNewNumberOfPartitions,
                        PartitionGroupingDistEnum partGroupDist)
{
  if (suggestedNewNumberOfPartitions == 1)
    return new(CmpCommon::statementHeap())
      SinglePartitionPartitioningFunction();

  // If an expression has been generated, then we want to discard it
  // because it may no longer be correct.
  storeExpression(NULL);

  // Allow arbitrary scaling down of HashDistPartitioningFunction.
  // (The runtime will handle the mapping of physical partitions
  // to logical partitions.)
  // Scaling up (to a greater number of partitions is not allowed for
  // HashDistPartitioningFunction.
  //
  suggestedNewNumberOfPartitions = 
    (suggestedNewNumberOfPartitions > numberOfPartitions_) 
    ? numberOfPartitions_
    : suggestedNewNumberOfPartitions;

  numberOfPartitions_ = suggestedNewNumberOfPartitions;

  return this;
} // HashDistPartitioningFunction::scaleNumberOfPartitions()

// -----------------------------------------------------------------------
// HashDistPartitioningFunction::isAGroupingOf()
// -----------------------------------------------------------------------
NABoolean
HashDistPartitioningFunction::isAGroupingOf(const PartitioningFunction &other,
                                            Lng32* maxPartsPerGroup) const
{
  if (maxPartsPerGroup != NULL)
    *maxPartsPerGroup = 1;

  const HashDistPartitioningFunction *oth =
    other.castToHashDistPartitioningFunction();

  // If other is not a HashDistPartitioningFunction, then it cannot
  // be a grouping of...
  if (oth == NULL)
    return FALSE;

  if (keyColumnList_.entries() != oth->keyColumnList_.entries())
    return FALSE;

  // compare the key columns and their order
  for (CollIndex i = 0; i < keyColumnList_.entries(); i++)
  {
    if (keyColumnList_[i] != oth->keyColumnList_[i])
      return FALSE;
    if (  NOT (originalKeyColumnList_[i].getType() ==
         oth->originalKeyColumnList_[i].getType() )
       )
      return FALSE;
  }

  // If this function has more partitions than other,
  // then it cannot be a grouping of.
  // Eg.  this.numParts: 10 this.origNumParts: 20
  //       oth.numParts: 5   oth.origNumParts: 20
  //
  // If the two functions are not based on the same physical function,
  // then it cannot be a grouping of.
  // Eg.  this.numParts: 10 this.origNumParts: 20
  //       oth.numParts: 10  oth.origNumParts: 30
  //
  if((getCountOfPartitions() > oth->getCountOfPartitions()) ||
     (getCountOfOrigHashPartitions() != oth->getCountOfOrigHashPartitions())) {
    return FALSE;
  }

  // Here the following is known to be TRUE:
  //
  //  (getCountOfPartitions() <= oth->getCountOfPartitions()
  //
  // AND
  //
  //  (getCountOfOrigHashPartitions() == oth->getCountOfOrigHashPartitions())
  //
  // Eg.  this.numParts: 10 this.origNumParts: 20
  //       oth.numParts: 10  oth.origNumParts: 20
  //
  //      this.numParts:  5 this.origNumParts: 20
  //       oth.numParts: 10  oth.origNumParts: 20

  // If other has not been scaled (allow arbitrary scaling of one function):
  // Eg.  this.numParts:  7 this.origNumParts: 20
  //       oth.numParts: 20  oth.origNumParts: 20
  // OR
  // If they have both been scaled to the same number of partitions:
  // then it is a grouping of.
  // Eg.  this.numParts:  7 this.origNumParts: 20
  //       oth.numParts:  7  oth.origNumParts: 20
  //
  if((oth->getCountOfPartitions() == oth->getCountOfOrigHashPartitions()) ||
     (getCountOfPartitions() == oth->getCountOfPartitions())) {

    if (maxPartsPerGroup != NULL)
      *maxPartsPerGroup =
        ((oth->getCountOfPartitions() + getCountOfPartitions() - 1)
         / getCountOfPartitions());

    return TRUE;
  }

  // WARNING.....  I am not sure if the current code can ever produce
  // a situation that would bring control to here.  Also, I am not
  // sure if the semantics of GROUPING implemented below are correct
  // for these situations.
  //
  // Here the following is known to be TRUE:
  //
  //  both functions have been scaled.  (I DON'T THINK THIS CAN HAPPEN)
  //
  // AND
  //
  //  They are scaled to different sizes.
  //
  // Eg.  this.numParts:  5 this.origNumParts: 20
  //       oth.numParts: 10  oth.origNumParts: 20
  //
  //      this.numParts:  7 this.origNumParts: 20
  //       oth.numParts: 10  oth.origNumParts: 20
  //

  // Under these conditions, three things must be true for it to be a
  // grouping of:
  //
  // - the scaled number of partitions must evenly divide the scaled
  //   number of partitions of other
  // Eg.  this.numParts:  5 this.origNumParts: 20
  //       oth.numParts: 10  oth.origNumParts: 20
  //
  // - the other scaling must be a multiple of or evenly divide the
  //   original number of partitions
  // Eg.  this.numParts:  5 this.origNumParts: 20
  //       oth.numParts: 10  oth.origNumParts: 20
  //
  //      this.numParts:  5 this.origNumParts: 20
  //       oth.numParts: 40  oth.origNumParts: 20
  //
  // - this scaling must also be a multiple of or evenly divide the
  //   original number of partitions
  // Eg.  this.numParts:  5 this.origNumParts: 20
  //       oth.numParts: 10  oth.origNumParts: 20
  //
  //      this.numParts: 40 this.origNumParts: 20
  //       oth.numParts: 10  oth.origNumParts: 20
  //

  // If the scaled number of partitions evenly divides the scaled
  // number of partitions of other...
  //
  if((oth->getCountOfPartitions() % getCountOfPartitions()) != 0) {
    return FALSE;
  }

  // AND the other scaling is a multiple of or evenly divides the
  // original number of partitions...
  //
  if(oth->getCountOfOrigHashPartitions() >= oth->getCountOfPartitions()) {
    if(oth->getCountOfOrigHashPartitions() % oth->getCountOfPartitions()) {
      return FALSE;
    }
  } else {
    if(oth->getCountOfPartitions() % oth->getCountOfOrigHashPartitions()) {
      return FALSE;
    }
  }

  // AND this scaling is a multiple of or evenly divides the original
  // number of partitions ...
  //
  if(getCountOfOrigHashPartitions() >= getCountOfPartitions()) {
    if(getCountOfOrigHashPartitions() % getCountOfPartitions()) {
      return FALSE;
    }
  } else {
    if(getCountOfPartitions() % getCountOfOrigHashPartitions()) {
      return FALSE;
    }
  }

  if (maxPartsPerGroup != NULL)
    *maxPartsPerGroup =
      ((oth->getCountOfPartitions() + getCountOfPartitions() - 1)
       / getCountOfPartitions());

  // THEN it is a grouping of...
  //
  return TRUE;

} // HashDistPartitioningFunction::isAGroupingOf()

// -----------------------------------------------------------------------
//  - Create expressions for the constant values for the original and
//    scale number of partitions.  Create the hash1 (hash distrib)
//    function, PAGroup, and return the result.
// -----------------------------------------------------------------------
ItemExpr *
HashDistPartitioningFunction::buildPartitioningExpression(
                             const ValueIdList &keyCols) const
{
  CollHeap *heap = CmpCommon::statementHeap();

  NAType *numPartsType = new (heap) SQLInt(heap, FALSE,FALSE);
  char buffer[20];

  // Create a ConstValue expression containing the original number of hash
  // partitions.
  Lng32 numParts = getCountOfOrigHashPartitions();
  sprintf(buffer, "%d", numParts);
  NAString numPartsStr("origNumParts");
  ItemExpr *origNumParts =
    new (heap) ConstValue(numPartsType, &numParts, sizeof(Lng32), &numPartsStr);

  // Create a ConstValue expression containing the scaled number of hash
  // partitions.
  numParts = getCountOfPartitions();
  sprintf(buffer, "%d", numParts);
  numPartsStr = "scaledNumParts";
  ItemExpr *scaledNumParts = new (heap)
     ConstValue(numPartsType, &numParts, sizeof(Lng32), &numPartsStr);

  // Create the hash distribution function.
  // partitions.
  ItemExpr *partFunc =
    new (heap)
    ProgDistrib(new (heap) HashDistPartHash(keyCols.rebuildExprTree(ITM_ITEM_LIST)), origNumParts);

  // Add a PAGroup expression and return the partitioning function
  partFunc = new (heap) PAGroup(partFunc, scaledNumParts, origNumParts);
  return partFunc;
}

// -----------------------------------------------------------------------
// HashDistPartitioningFunction::buildPartitioningSelectionExpr
//  - build the expression used during partition selection
// -----------------------------------------------------------------------
ItemExpr *
HashDistPartitioningFunction::buildPartitioningSelectionExpr(
                             const ValueIdList &keyCols,
                             ItemExpr *numParts) const
{
  CollHeap *heap = CmpCommon::statementHeap();

  ItemExpr *partFunc =
    new (heap)
    ProgDistrib(new (heap)
                HashDistPartHash(keyCols.rebuildExprTree(ITM_ITEM_LIST)),
                numParts);

  return partFunc;
}

// -----------------------------------------------------------------------
// Hash2PartitioningFunction Destructor
// -----------------------------------------------------------------------
Hash2PartitioningFunction::~Hash2PartitioningFunction() {}

// -----------------------------------------------------------------------
// Hash2PartitioningFunction Safe down cast.
// -----------------------------------------------------------------------
const Hash2PartitioningFunction*
Hash2PartitioningFunction::castToHash2PartitioningFunction() const
{
  return this;
}

PartitioningRequirement*
Hash2PartitioningFunction::makePartitioningRequirement()
{
  return new (CmpCommon::statementHeap())
    RequireHash2(this);
}

PartitioningFunction*
Hash2PartitioningFunction::copy() const
{
  return new (CmpCommon::statementHeap()) Hash2PartitioningFunction(*this);
}

// -----------------------------------------------------------------------
// Method for debugging.
// -----------------------------------------------------------------------
const NAString Hash2PartitioningFunction::getText() const
{
  NAString result("hash2 partitioned ");

  char nparts[32];
  sprintf(nparts,"%d ways on (", numberOfPartitions_);
  result += nparts;

  getKeyColumnList().unparse(result,DEFAULT_PHASE,EXPLAIN_FORMAT);
  result += ")";

  return result;
}

void Hash2PartitioningFunction::print(FILE* ofd, const char* indent,
                                         const char* title) const
{
  PartitioningFunction::print(ofd, indent, "Hash2PartitioningFunction");
} // TableHashPartitioningFunction::print()


// -----------------------------------------------------------------------
// Hash2PartitioningFunction::createPartitioningFunctionForIndexDesc()
// -----------------------------------------------------------------------
PartitioningFunction*
Hash2PartitioningFunction::
createPartitioningFunctionForIndexDesc(IndexDesc *idesc) const
{
  const NAFileSet * fileSet = idesc->getNAFileSet();
  const NAColumnArray & allColumns = fileSet->getAllColumns();
  const NAColumnArray & partKeyColumns = fileSet->getPartitioningKeyColumns();

  CollIndex ixColNumber;
  ValueId keyValueId;
  ValueIdSet partitioningKey;
  ValueIdList partitioningKeyList;

  for (CollIndex i = 0; i < partKeyColumns.entries(); i++)
    {
      // which column of the index is this (usually this will be == i)
      ixColNumber = allColumns.index(partKeyColumns[i]);

      // insert the value id of the index column into the partitioning
      // key column value id set
      keyValueId = idesc->getIndexColumns()[ixColNumber];
      partitioningKey += keyValueId;
      partitioningKeyList.insertAt(i,keyValueId);
    } // end loop over partitioning key columns

  // -----------------------------------------------------------------
  // Allocate a new HashPartitioningFunction.
  // -----------------------------------------------------------------
  Hash2PartitioningFunction *partFunc = new(idesc->wHeap())
    Hash2PartitioningFunction (partitioningKey,
                               partitioningKeyList,
                               getCountOfPartitions(),
                               getNodeMap()->copy(idesc->wHeap()));

  partFunc->setRestrictedBeginPartNumber(getRestrictedBeginPartNumber());
  partFunc->setRestrictedEndPartNumber(getRestrictedEndPartNumber());

  // -----------------------------------------------------------------
  // Construct the partitioning key predicates.
  // -----------------------------------------------------------------
  partFunc->createPartitioningKeyPredicates();

  return partFunc;

} // Hash2PartitioningFunction::createPartitioningFunctionForIndexDesc()

// -----------------------------------------------------------------------
// Hash2PartitioningFunction::createPartSelectionExprFromSearchKey
// is called from PartitionAccess::preCodeGen() to create expressions
// for determining the beginning and ending partition numbers that
// partition access must deal with.  Other partitioning schemes
// pass partition numbers, but Hash2 passes hash boundaries.  Hash2
// passes hash boundaries instead to allow a single ESP to work with
// groupings of tables with different numbers of physical partitions.
// -----------------------------------------------------------------------
void Hash2PartitioningFunction::createPartSelectionExprFromSearchKey(
       const ValueId beginPartSelId,
       const ValueId endPartSelId,
       ValueIdList &partSelectionValIds) const
{
  ItemExpr *beginPartSelExpr = new (CmpCommon::statementHeap())
             Hash2Distrib(beginPartSelId.getItemExpr(),
                          partitionSelectionExprInputs()[1].getItemExpr());
  beginPartSelExpr->synthTypeAndValueId();

  ItemExpr *endPartSelExpr = new (CmpCommon::statementHeap())
             Hash2Distrib(endPartSelId.getItemExpr(),
                          partitionSelectionExprInputs()[1].getItemExpr());
  endPartSelExpr->synthTypeAndValueId();

  partSelectionValIds.insert(beginPartSelExpr->getValueId());
  partSelectionValIds.insert(endPartSelExpr->getValueId());
}

// -----------------------------------------------------------------------
// Hash2PartitioningFunction::comparePartFuncToFunc(): Compare this
// partitioning function to another hash2 function. To be 'SAME'
// must have the same number and order of partitioning key columns and
// the same number of scaled partitions.
// -----------------------------------------------------------------------
COMPARE_RESULT
Hash2PartitioningFunction::
comparePartFuncToFunc(const PartitioningFunction &other) const
{
  COMPARE_RESULT c = PartitioningFunction::comparePartFuncToFunc(other);

  if (c != SAME)
    return INCOMPATIBLE;

  const Hash2PartitioningFunction *oth =
    other.castToHash2PartitioningFunction();

  // Since they compared 'SAME', oth should always exist, so this
  // test is redundant.
  //
  if(!oth)
    return INCOMPATIBLE;

  // If OLD_HASH2_GROUPING is turned on, then the original number of
  // partitions must be equal.  The OLD_HASH2_GROUPING CQD is for test
  // purposes and will most likely be removed after the new hash2
  // grouping has been tested extensively. 
  if (CmpCommon::getDefault(OLD_HASH2_GROUPING) == DF_ON &&
      getCountOfOrigHashPartitions() != oth->getCountOfOrigHashPartitions())
    return INCOMPATIBLE;

  // The normal behavior of hash2 grouping is that the scaled
  // number of partitions must be equal.
  if (getCountOfPartitions() != oth->getCountOfPartitions())
    return INCOMPATIBLE;

  // Make sure that the keys are in the same order.
  //
  if (keyColumnList_.entries() != oth->keyColumnList_.entries())
    return INCOMPATIBLE;

  for (CollIndex i = 0; i < keyColumnList_.entries(); i++)
  {
    if (keyColumnList_[i] != oth->keyColumnList_[i])
      return INCOMPATIBLE;

    if (  NOT (originalKeyColumnList_[i].getType() ==
         oth->originalKeyColumnList_[i].getType() )
       )
      return INCOMPATIBLE;
  }

  return SAME;

} // Hash2PartitioningFunction::comparePartFuncToFunc()

// -----------------------------------------------------------------------
// Hash2PartitioningFunction::scaleNumberOfPartitions()
// -----------------------------------------------------------------------
PartitioningFunction *
Hash2PartitioningFunction::
scaleNumberOfPartitions(Lng32 &suggestedNewNumberOfPartitions,
                        PartitionGroupingDistEnum partGroupDist)
{
  // Do not allow suggestedNewNumberOfPartitions greater than
  // the number of partitions.
  suggestedNewNumberOfPartitions =
    (suggestedNewNumberOfPartitions > numberOfPartitions_)
    ? numberOfPartitions_
    : suggestedNewNumberOfPartitions;

  // Only allow a suggestedNewNumberOfPartitions that evenly
  // divides into the number of partitions.
  CMPASSERT(suggestedNewNumberOfPartitions > 0);
  while (numberOfPartitions_ % suggestedNewNumberOfPartitions != 0)
    suggestedNewNumberOfPartitions--;

  if (suggestedNewNumberOfPartitions == 1)
    return new(CmpCommon::statementHeap())
      SinglePartitionPartitioningFunction();

  // If an expression has been generated, then we want to discard it
  // because it may no longer be correct.
  storeExpression(NULL);

  if ( suggestedNewNumberOfPartitions != getCountOfPartitions() ) {
     NodeMap* newNodeMap =
       new (heap_) NodeMap(heap_, suggestedNewNumberOfPartitions);
     replaceNodeMap(newNodeMap);
  }

  numberOfPartitions_ = suggestedNewNumberOfPartitions;

  return this;

} // Hash2PartitioningFunction::scaleNumberOfPartitions()

// -----------------------------------------------------------------------
// Hash2PartitioningFunction::isAGroupingOf()
// -----------------------------------------------------------------------
NABoolean
Hash2PartitioningFunction::isAGroupingOf(const PartitioningFunction &other,
                                            Lng32* maxPartsPerGroup) const
{
  if (maxPartsPerGroup != NULL)
    *maxPartsPerGroup = 1;

  const Hash2PartitioningFunction *oth =
    other.castToHash2PartitioningFunction();

  // If other is not a Hash2PartitioningFunction, then it cannot
  // be a grouping of...
  if (oth == NULL)
    return FALSE;

  if (getCountOfPartitions() > oth->getCountOfPartitions() ||
      oth->getCountOfPartitions() % getCountOfPartitions() != 0)
    return FALSE;

  // For testing purposes, it was requested that the ability to
  // provide grouping similar to hash1 be provided.  If the
  // OLD_HASH2_GROUPING CQD is true, then only allow grouping
  // when the number of original partitions is the same.
  if (CmpCommon::getDefault(OLD_HASH2_GROUPING) == DF_ON &&
      getCountOfPartitions() != oth->getCountOfOrigHashPartitions())
    return FALSE;

  if (keyColumnList_.entries() != oth->keyColumnList_.entries())
    return FALSE;

  // compare the key columns and their order
  for (CollIndex i = 0; i < keyColumnList_.entries(); i++)
  {
    if (keyColumnList_[i] != oth->keyColumnList_[i])
      return FALSE;
    if (  NOT (originalKeyColumnList_[i].getType() ==
         oth->originalKeyColumnList_[i].getType() )
       )
      return FALSE;
  }

  // This is a grouping of.  Set the maxPartsPerGroup and return TRUE.
  if (maxPartsPerGroup != NULL)
    *maxPartsPerGroup = oth->getCountOfPartitions() / getCountOfPartitions();

  return TRUE;

} // Hash2PartitioningFunction::isAGroupingOf()

// -----------------------------------------------------------------------
// Hash2PartitioningFunction::buildPartitioningExpression()
//   - Create an expression for the Hash2 partitioning function.
// -----------------------------------------------------------------------
ItemExpr *
Hash2PartitioningFunction::buildPartitioningExpression(const ValueIdList &keyCols) const
{
  CollHeap *heap = CmpCommon::statementHeap();
  NAType *numPartsType = new (heap) SQLInt(heap, FALSE,FALSE);

  // Build a ConstValue expression of the scaled number of partitions.
  Lng32 numParts = getCountOfPartitions();
  char buffer[20];
  sprintf(buffer, "%d", numParts);
  NAString numPartsStr("scaledNumParts");
  ItemExpr *scaledNumParts = new (heap)
     ConstValue(numPartsType, &numParts, sizeof(Lng32), &numPartsStr);

  // Create the hash2 partitioning function expression and return it.
  ItemExpr *partFunc = new (heap) Hash2Distrib(
          new (heap) HashDistPartHash(keyCols.rebuildExprTree(ITM_ITEM_LIST)), scaledNumParts
                                              );

  return partFunc;
}

// -----------------------------------------------------------------------
// HashDistPartitioningFunction::buildPartitioningSelectionExpr
//  - build the expression used during partition selection
// -----------------------------------------------------------------------
ItemExpr *
Hash2PartitioningFunction::buildPartitioningSelectionExpr(
                             const ValueIdList &keyCols,
                             ItemExpr *numParts) const
{
  CollHeap *heap = CmpCommon::statementHeap();

  ItemExpr *partFunc = new (heap)
    Hash2Distrib(new (heap)
                 HashDistPartHash(keyCols.rebuildExprTree(ITM_ITEM_LIST)),
                 numParts);

  return partFunc;
}

// A refactored method whose implementaion is originally contained inside
// method HashPartitioningFunction::createPartitioningExpression().
ItemExpr* 
PartitioningFunction::getCastedItemExpre(ItemExpr* iv, const NAType& oType, CollHeap* heap) 
{
   ItemExpr *dataConversionErrorFlag = getConvErrorExpr();
   if (dataConversionErrorFlag == 0)
       {
          dataConversionErrorFlag =
            new (CmpCommon::statementHeap()) HostVar(
                 "_sys_repartConvErrorFlg",
                 new (CmpCommon::statementHeap()) SQLInt(CmpCommon::statementHeap(), TRUE,FALSE),
                 TRUE);
          storeConvErrorExpr(dataConversionErrorFlag);
       }

   // Note that we always generate a Narrow operator here, even
   // if it's not necessary. Because keyColumnList_[i] may be a
   // VEGReference it may not be possible to determine its
   // exact type at this time. We could eliminate some cases of
   // Narrow in the preCodeGen phase but this is not done yet.
   // Begin_Fix 10-040114-2431
   // 02/18/2004
   // changed statement below for above mentioned solution
   ItemExpr* c = new (CmpCommon::statementHeap()) Narrow(
         iv,
         dataConversionErrorFlag,
         &oType,
         ITM_NARROW,
         FALSE,
         TRUE); // TRUE => make nullability of Narrow same as child's
   // End_Fix 10-040114-2431

   ((Narrow*)c)->setMatchChildType(TRUE);
   return c;
}


// ***********************************************************************
// SkewedDataPartitioningFunction
// ***********************************************************************
  
SkewedDataPartitioningFunction::SkewedDataPartitioningFunction(
        PartitioningFunction* partFuncForUnskewed,
                              const skewProperty& sk,
                              NAMemory* heap
                                )
: PartitioningFunction(SKEWEDDATA_PARTITIONING_FUNCTION, NULL, heap), 
  partialPartFunc_(partFuncForUnskewed), skewProperty_(sk),
  skewHashList_(NULL)
{
  CMPASSERT(NOT sk.isAnySkew());
  CMPASSERT(partFuncForUnskewed->canHandleSkew());
  ValueIdList pivl;
  ValueIdSet pivs;

  // Create the partition input variable and partitioning key for this
  // function. The main purpose of this is to produce a partitioning key.
  // The skew buster partitioning function doesn't have a real
  // partitioning key. So, one cannot really decide from a row in
  // which partition it is (at least not for a skew element row).
  // Because we need to have some partitioning key, we basically say
  // "you have to know the partition number in order to compute the
  // partition number". This is another way of saying that the
  // partition number cannot be computed from the row. We achieve this
  // by making the PIV the partitioning key.
  // If needed, we can also make the PIV the partitioning expression. We
  // cannot make a partitioning key expression with this method, however.
  createPIV(pivl);
  pivs = pivl;
  setPartKey(pivs);
}

void SkewedDataPartitioningFunction::createPIV(ValueIdList &partInputValues)
{
  // Create a single partition input value
  //
  ItemExpr *dummyPIV = new (CmpCommon::statementHeap())
    HostVar("_sys_dummySkewBusterPartNo",
	    new (CmpCommon::statementHeap()) SQLInt(CmpCommon::statementHeap(), FALSE, FALSE),
	    TRUE);
  dummyPIV->synthTypeAndValueId();

  // the partition input value is one integer, which is also used as
  // the partitioning key
  partInputValues.insert(dummyPIV->getValueId());

  // Store the partition input values.
  //
  storePartitionInputValues(partInputValues);
}

void SkewedDataPartitioningFunction::replacePivs(
       const ValueIdList& newPivs,
       const ValueIdSet& newPartKeyPreds)
{
    partialPartFunc_ -> replacePivs(newPivs, newPartKeyPreds);
}

SkewedDataPartitioningFunction::SkewedDataPartitioningFunction(
    const SkewedDataPartitioningFunction& other, NAMemory* heap)
: PartitioningFunction(other, heap), 
  skewHashList_(NULL),                      // recompute this
  partialPartFunc_(other.partialPartFunc_), // may need a deep copy
  skewProperty_(other.skewProperty_)        // share the skew property
{
}

Lng32 SkewedDataPartitioningFunction::getCountOfPartitions() const
{
   return partialPartFunc_->getCountOfPartitions();
}

void SkewedDataPartitioningFunction::createPartitioningKeyPredicates()
{
  // do nothing, there aren't any partitioning key preds for a skew
  // partition
  storePartitioningKeyPredicates(ValueIdSet());
  partialPartFunc_ -> createPartitioningKeyPredicates();
}

PartitioningRequirement*
SkewedDataPartitioningFunction::makePartitioningRequirement()
{
  return new (CmpCommon::statementHeap()) RequireSkewed(this);
}

PartitioningFunction*
SkewedDataPartitioningFunction::copy() const
{
  SkewedDataPartitioningFunction *result;

  result = new (CmpCommon::statementHeap())
    SkewedDataPartitioningFunction(*this);
  result->partialPartFunc_ = partialPartFunc_->copy();

  return result;
}

// -----------------------------------------------------------------------
// Method for debugging.
// -----------------------------------------------------------------------
const NAString SkewedDataPartitioningFunction::getText() const
{
  NAString result = partialPartFunc_->getText();

  // SKEW_EXPLAIN = off --> do no display any details on skew-processing
  // method or skew value list.
  if ( CmpCommon::getDefault(SKEW_EXPLAIN) == DF_OFF )
    return result;

  NAString pat, suffix;

  switch ( skewProperty_.getIndicator() ) {
     case skewProperty::UNIFORM_DISTRIBUTE:
       suffix = "-ud";
       break;
     case skewProperty::BROADCAST:
       suffix = "-br";
       break;
     default:
       break;
  }

  switch (partialPartFunc_ -> getPartitioningFunctionType()) {
    case HASH_PARTITIONING_FUNCTION:
       pat = "hash";
       break;
    case HASH_DIST_PARTITIONING_FUNCTION:
       pat = "hash1";
       break;
    case HASH2_PARTITIONING_FUNCTION:
       pat = "hash2";
       break;
    default:
       break;
  }
  size_t loc = result.index(pat);

  if ( loc != NA_NPOS ) {
     result.insert(loc+pat.length(), suffix);
  }

  result += skewProperty_.getText();

  return result;
}

void SkewedDataPartitioningFunction::print(FILE* ofd, const char* indent,
                                         const char* title) const
{
  PartitioningFunction::print(ofd, indent, "SkewedDataPartitioningFunction");
}  

// -----------------------------------------------------------------------
// SkewedDataPartitioningFunction::comparePartFuncToFunc(): Compare this
// partitioning function to another hash2 function. To be 'SAME'
// must have the same number and order of partitioning key columns and
// the same number of scaled partitions.
// -----------------------------------------------------------------------
COMPARE_RESULT
SkewedDataPartitioningFunction::
comparePartFuncToFunc(const PartitioningFunction &other) const
{
  // the other has to be a SkewedDataPartitioningFunction
  const SkewedDataPartitioningFunction *oth =
    other.castToSkewedDataPartitioningFunction();

  if(!oth) return INCOMPATIBLE;

  // compare the two partial partfuncs.
  COMPARE_RESULT c = partialPartFunc_->comparePartFuncToFunc(
                         *(oth->getPartialPartitioningFunction())
                                                            );

  if (c != SAME) return INCOMPATIBLE;

  // compare the skew property last.
  if ( NOT (skewProperty_ == oth->skewProperty_) )
    return INCOMPATIBLE;


  return SAME;

} // SkewedDataPartitioningFunction::comparePartFuncToFunc()

// -----------------------------------------------------------------------
// SkewedDataPartitioningFunction::scaleNumberOfPartitions()
// -----------------------------------------------------------------------


//::scaleNUmberOfPartitions() are called in following locations
//
//  OptPhysRelExpr.cpp
//	3536 NJ::genLeftChild()
//	4892 NJ:createContextForChild() (rightPartFunc->)
//	14166 and 14262	synthDP2PhysicalProperty()
//
//  GP.cpp
//	955 GroupAttributes::recommendedOrderForNJProbing()
//
//As a result, the following version will not be called.

PartitioningFunction *
SkewedDataPartitioningFunction::
scaleNumberOfPartitions(Lng32 &suggestedNewNumberOfPartitions,
                        PartitionGroupingDistEnum partGroupDist)
{
   PartitioningFunction *mayBeNewPartfunc = 
                      partialPartFunc_->scaleNumberOfPartitions(
                         suggestedNewNumberOfPartitions, partGroupDist);

   if ( mayBeNewPartfunc != partialPartFunc_ )
      partialPartFunc_ = mayBeNewPartfunc;
   
    return this;

} // SkewedDataPartitioningFunction::scaleNumberOfPartitions()

// -----------------------------------------------------------------------
// SkewedDataPartitioningFunction::isAGroupingOf()
// -----------------------------------------------------------------------
NABoolean
SkewedDataPartitioningFunction::isAGroupingOf(const PartitioningFunction &other,
                                            Lng32* maxPartsPerGroup) const
{
   return FALSE;
} // SkewedDataPartitioningFunction::isAGroupingOf()

// -----------------------------------------------------------------------
// SkewedDataPartitioningFunction::copyAndRemap()
// -----------------------------------------------------------------------
PartitioningFunction*
SkewedDataPartitioningFunction::copyAndRemap
                         (ValueIdMap& map, NABoolean mapItUp) const
{
  SkewedDataPartitioningFunction *newPartFunc =
    (SkewedDataPartitioningFunction *)
    copy()->castToSkewedDataPartitioningFunction();

  CMPASSERT(skewProperty_.getIndicator() != skewProperty::BROADCAST);

  newPartFunc->remapIt(this, map, mapItUp);
  newPartFunc->partialPartFunc_->remapIt(partialPartFunc_, map, mapItUp);

  return newPartFunc;

} // SkewedDataPartitioningFunction::copyAndRemap()

void
SkewedDataPartitioningFunction::preCodeGen(const ValueIdSet& availableValues)
{
   PartitioningFunction::preCodeGen(availableValues);
   partialPartFunc_->preCodeGen(availableValues);
}

ItemExpr* SkewedDataPartitioningFunction::createPartitioningExpression()
{
   CMPASSERT(partialPartFunc_ -> isAHash2PartitioningFunction() == TRUE);

   // cast to TableHashPartitioningFunction class (the parent class
   // of Hash2PartitioningFunction) which has the method 
   // createPartitioningExpressionImp() defined.
   ItemExpr* partExpr = ((TableHashPartitioningFunction*)partialPartFunc_)
        -> createPartitioningExpressionImp(TRUE /*do varchar cast*/);

   storeExpression(partExpr);
   return partExpr;
}

// ***********************************************************************
// RangePartitionBoundaries
// ***********************************************************************

// -----------------------------------------------------------------------
// Constructor for RangePartitionBoundaries
// -----------------------------------------------------------------------
RangePartitionBoundaries::RangePartitionBoundaries
                             (Lng32 numberOfPartitions,
			      Lng32 numberOfPartitioningKeyColumns, NAMemory *h)
    : partKeyColumnCount_(numberOfPartitioningKeyColumns),
      origPartKeyColumnCount_(numberOfPartitioningKeyColumns),
      partitionCount_(numberOfPartitions),
      origPartitionCount_(numberOfPartitions),
      boundaryStringsList_(h,numberOfPartitions+1),
      boundaryValuesList_(h,numberOfPartitions+1),
      boundaryValues_(h,numberOfPartitions+1),
      binaryBoundaryValues_(h,numberOfPartitions+1),
      encodedBoundaryKeyLength_(-1),
      setBinaryBoundaryFirstLastKey_(FALSE),
      setupForStatement_(FALSE),
      resetAfterStatement_(FALSE),
      heap_(h)
{
  // MUST be given non zero counts

  // make two dummy entries for the first and last key
  boundaryValuesList_.insertAt(0,NULL);
  boundaryValuesList_.insertAt(numberOfPartitions,NULL);

  boundaryStringsList_.insertAt(0, NULL);
  boundaryStringsList_.insertAt(numberOfPartitions,NULL);

} // RangePartitionBoundaries::RangePartitionBoundaries()

RangePartitionBoundaries::~RangePartitionBoundaries() {}

void RangePartitionBoundaries::defineUnboundBoundary
                               (Lng32 partitionNumber,
                                const ItemExpr * unboundBoundaryValue,
                                const char *encodedKeyValue)
{
  // must insert a 'true' boundary between the first and last (pre-allocated)
  // entry, this gives us n insertion points for a table with n partitions,
  // new entry delimits partition <partitionNumber>
  CMPASSERT(partitionNumber > 0 AND
      partitionNumber <= partitionCount_ AND
      unboundBoundaryValue != NULL AND
      encodedKeyValue != NULL);
  boundaryValuesList_.insertAt(partitionNumber, unboundBoundaryValue);


  // encodedKeyValue should be NULL only for SQL/MP tables
  if (encodedKeyValue)
    binaryBoundaryValues_.insertAt(partitionNumber, encodedKeyValue);
}

void RangePartitionBoundaries::bindAddBoundaryValue(Lng32 partitionNumber)
{

  if(!boundaryValuesList_[partitionNumber]){
    boundaryValues_.insertAt(partitionNumber, NULL);
    return;
  }

  //get unbound boundary value
  ItemExpr * unboundBoundaryValue = ((ItemExpr *)boundaryValuesList_[partitionNumber])->
                                    copyTree(CmpCommon::statementHeap());
                                  //new (CmpCommon::statementHeap())
                                  //ItemExpr(*boundaryValuesList_[partitionNumber]);

  //bind the boundary value
  unboundBoundaryValue->synthTypeAndValueId();

  ItemExprList * boundBoundaryValue = new (CmpCommon::statementHeap())
                                      ItemExprList(unboundBoundaryValue,
                                                   CmpCommon::statementHeap());

  boundaryValues_.insertAt(partitionNumber, boundBoundaryValue);

  if (!boundaryStringsList_.used(partitionNumber)) {
    NAString result;

    // use QUERY_FORMAT to obtain the full SQL text.
    boundBoundaryValue->unparse(result, OPTIMIZER_PHASE, QUERY_FORMAT);

    boundaryStringsList_.insertAt(partitionNumber, new (heap_) NAString(result));

  }
}

// -----------------------------------------------------------------------
// RangePartitionBoundaries::defineBoundary()
// -----------------------------------------------------------------------
void RangePartitionBoundaries::defineBoundary
                                  (Lng32 partitionNumber,
				   const ItemExprList* boundaryValue,
				   const char *encodedKeyValue)
{
  // must insert a 'true' boundary between the first and last (pre-allocated)
  // entry, this gives us n insertion points for a table with n partitions,
  // new entry delimits partition <partitionNumber>
  CMPASSERT(partitionNumber > 0 AND
	    partitionNumber <= partitionCount_ AND
	    boundaryValue != NULL AND
	    encodedKeyValue != NULL);
  boundaryValues_.insertAt(partitionNumber, boundaryValue);

  // encodedKeyValue should be NULL only for SQL/MP tables
  if (encodedKeyValue)
    binaryBoundaryValues_.insertAt(partitionNumber, encodedKeyValue);
} // RangePartitionBoundaries::defineBoundary()

// -----------------------------------------------------------------------
// RangePartitionBoundaries::checkConsistency()
// -----------------------------------------------------------------------
void RangePartitionBoundaries::checkConsistency(
     const Lng32 numberOfPartitions) const
{
  // If n = numberOfPartitions then entries 1 through n-1 have to be
  // present. Each entry delimits the boundary between two partitions.
  // Entries 0 and n contain the minimum and maximum permissible
  // values, respectively.  Entries 0 and n are initialized to NULL in
  // the constructor and later added by method
  // RangePartitioningFunction::completePartitionBoundaries()
  CMPASSERT(numberOfPartitions == partitionCount_ AND
	    (Lng32)boundaryValues_.entries() == partitionCount_ + 1);

  // check the actual boundaries after the key length of the encoded
  // key and the key column value ids have been entered, but not before
  if (encodedBoundaryKeyLength_ > 0)
    {
      CMPASSERT(boundaryValues_.entries() == binaryBoundaryValues_.entries());

      for (CollIndex index = 1;
	   index < (CollIndex)numberOfPartitions;
	   index++)
	{
	  // boundary values in SQL format and encoded key format must be
	  // filled in
	  CMPASSERT(boundaryValues_[index]);
	  CMPASSERT(binaryBoundaryValues_[index]);

	  // encoded key values must be in ascending order
//	  CMPASSERT(str_cmp(binaryBoundaryValues_[index-1],
//			    binaryBoundaryValues_[index],
//			    encodedBoundaryKeyLength_) < 0);
	}
    }
} // RangePartitionBoundaries::checkConsistency()

// -----------------------------------------------------------------------
// RangePartitionBoundaries::compareRangePartitionBoundaries()
// -----------------------------------------------------------------------
NABoolean RangePartitionBoundaries::compareRangePartitionBoundaries(
     const RangePartitionBoundaries& other,
     NABoolean groupingAllowed,
     Lng32* maxPartsPerGroup) const
{
  NABoolean match = FALSE;
  CollIndex i;
  CollIndex thisNumPartKeyCols,otherNumPartKeyCols;
  const ItemExprList *thisBoundaryValue;
  const ItemExprList *otherBoundaryValue;
  const ConstValue *thisConstValue;
  const ConstValue *otherConstValue;
  NABoolean thisIsNegated = FALSE;
  NABoolean otherIsNegated = FALSE;
  Lng32 currentPartsPerGroup = 1;
  Lng32 numOfPartsInLastGroup = 1;

  if (maxPartsPerGroup != NULL)
    *maxPartsPerGroup = 1; // start with the minimum

  // Give up now if "other" has fewer partitions than "this"
  if ( other.partitionCount_ < partitionCount_)
  {
    return FALSE;
  }

  // NOTE: We CANNOT compare the encoded values! Two values that are
  // the SAME will compare as NOT THE SAME if the types of the
  // underlying columns are different.
  //
  // So, we must compare the boundaryValues_ instead. The
  // boundaryValues_ are an array of pointers to ItemExpr Lists
  // which contain pointers to ConstValue nodes.
  //
  // We go through the array of ItemExpr Lists, and
  // first compare the ValueId's of the ItemExpr Lists which
  // contain pointers to the ItemExpr ConstValue nodes that represent
  // the first key values. If these match, great -
  // the first keys must be the same. But, they will only have the same
  // ValueId if they are from the same table, or if the datatypes of
  // the partitioning key columns are exactly the same.
  // If they don't have the same ValueId, then we need to compare
  // the original raw ASCII text as was present in the DDL.
  // We go through the ConstValue nodes in ItemExpr lists one
  // by one, and compare them. We use the "getConstStr" method
  // of the ConstValue class to do this. This method gets the
  // original raw text as typed in by the user, and also appends
  // the character set type. This may be important if a user
  // is joining two tables where the character set types of the
  // partitioning key columns are different.
  //
  // Go through the boundaries of "this" and try to find them
  // in "other" Don't bother with the first and last entry,
  // they are always the same. When all is said and done, we
  // must have found a match in other for every partition boundary
  // from "this". The opposite assertion - that every partition
  // boundary in "other" found a match in "this" - is not necessary
  // if grouping is allowed.
  CollIndex myix = 1;
  CollIndex otherix = 1;
  while (myix < (CollIndex) partitionCount_ AND
	 otherix < (CollIndex) other.partitionCount_)
  {
    match = FALSE;
    thisBoundaryValue = boundaryValues_[myix];
    otherBoundaryValue = other.boundaryValues_[otherix];

    // Check for the simple case: are the ValueId's the same?
    if (*thisBoundaryValue == *otherBoundaryValue)
    {
      match = TRUE;
    }
    else
    {
      // Traverse the ItemExprList and compare each ConstValue node
      // in "this" to the one in "other". The first keys are the
      // same only if they both have the same number of first keys,
      // they are either both negated or neither are,
      // and the string text of each ConstValue node is
      // exactly the same.
      thisNumPartKeyCols = thisBoundaryValue->entries();
      otherNumPartKeyCols = otherBoundaryValue->entries();
      if (thisNumPartKeyCols != otherNumPartKeyCols)
      {
        // If the two partitioning functions specified a different
        // number of first key values, then they cannot possibly
        // have equivalent partitioning schemes.
        return FALSE;
      }
      else
      {
        // Hope for the best...
        match = TRUE;
        for (i = 0;i < thisNumPartKeyCols AND match; i++)
        {
          thisIsNegated = FALSE;
          otherIsNegated = FALSE;
          thisConstValue =
            ((*thisBoundaryValue)[i])->castToConstValue(thisIsNegated);
          otherConstValue =
            ((*otherBoundaryValue)[i])->castToConstValue(otherIsNegated);
          if ((thisIsNegated != otherIsNegated) OR
              thisConstValue == NULL OR
              otherConstValue == NULL OR
              (thisConstValue->getConstStr(FALSE) !=
                otherConstValue->getConstStr(FALSE)))
            match = FALSE;
        }
      }
    }

    if (match)
    {
      // Found this boundary in "other". Advance to the next
      // start key in both "this" and "other".
      myix++;
      otherix++;
      if ((maxPartsPerGroup != NULL) AND
          (currentPartsPerGroup > *maxPartsPerGroup))
      {
        *maxPartsPerGroup = currentPartsPerGroup;
      }
      currentPartsPerGroup = 1; // clear
    }
    else if (groupingAllowed)
    {
      // No match. Advance to the next start key in "other"
      // to see if that will match.
      otherix++;
      currentPartsPerGroup++;
    }
    else
    {
      // These boundaries don't match, and since grouping is not allowed,
      // we are out of luck. Give up now.
      return FALSE;
    }
  } // end while more partitions

  // Since we didn't look at the last group, we need to compute the
  // number of partitions in it and see if it is the group with
  // the largest number of partitions.
  numOfPartsInLastGroup = (other.partitionCount_ - otherix) + 1;
  if ((maxPartsPerGroup != NULL) AND
      (numOfPartsInLastGroup > *maxPartsPerGroup))
  {
    *maxPartsPerGroup = numOfPartsInLastGroup;
  }

  if (myix == (CollIndex) partitionCount_)
  {
    // We have found all of our partition boundaries
    // in the other range partition boundaries object.
    return TRUE;
  }
  else
  {
    // Even with grouping, the partition boundaries don't match. Too bad.
    return FALSE;
  }
} // RangePartitionBoundaries::compareRangePartitionBoundaries()

//<pb>
//==============================================================================
//  Merge two range partition boundaries to produce new range partition
// boundaries.  Also produce a node map corresponding to the new range partition
// boundaries.
//
// Input:
//  other         -- other partition boundary map with which to merge.
//  thisNodeMap   -- node map associated with this RangePartitionBoundaries
//                    object.
//
// Output:
//  resultNodeMap -- node map associated with resulting RangepartitionBoundaries
//                    object.
//
// Return:
//  Pointer to merged partition boundaries.
//
//==============================================================================
RangePartitionBoundaries*
RangePartitionBoundaries::merge(const RangePartitionBoundaries& other,
                                const NodeMap& thisNodeMap,
                                      NodeMap& resultNodeMap) const
{
  // need binary boundary values to do a merge
  if (encodedBoundaryKeyLength_ == 0 OR
      other.encodedBoundaryKeyLength_ == 0)
    return NULL;

  // left is "this", right is "other",
  // merge boundaries 0...entries of "this" with boundaries 1...entries-1
  // of "other" (don't use -infinity and +infinity of right)
  CollIndex thisix   = 0;
  CollIndex otherix  = 1;
  CollIndex resultix = 0;
  CollIndex maxthis  = boundaryValues_.entries();
  CollIndex maxother = other.boundaryValues_.entries()-1;
  CollIndex maxboth  = maxthis + maxother;
  RangePartitionBoundaries *result = new(CmpCommon::statementHeap())
    RangePartitionBoundaries(1,partKeyColumnCount_,CmpCommon::statementHeap());
  Lng32 compResult;

  CMPASSERT(partKeyColumnCount_ == other.partKeyColumnCount_ AND
	    encodedBoundaryKeyLength_ == other.encodedBoundaryKeyLength_);

  // continue merging boundaries while at least one index has not reached its
  //  ending value.
  while (thisix < maxthis OR otherix < maxother)
    {

      //  The final boundary has no corresponding node map entry.
      // In other words, the number of boundaries is always one more than
      // the number of node map entries.
      if (maxboth - thisix - otherix > 1)
        {

          //  Since node maps have one less entry than boundary maps, ensure
          // that the node map's index does not fall off the end of the node
          // map.
          CollIndex nodeMapIx = MINOF(thisix, thisNodeMap.getNumEntries() - 1);

          //  Store node map entry associated with  current boundary.
          const NodeMapEntry* entry = thisNodeMap.getNodeMapEntry(nodeMapIx);
          resultNodeMap.setNodeMapEntry(resultix,
                                        *entry,
                                        CmpCommon::statementHeap());
        }

      // other index reached max, so comparison is automatically "less"
      if (otherix >= maxother)
        {
          compResult = -1;
        }
      else
        {

          // this index reached max, so comparison is automatically "more"
          if (thisix >= maxthis)
            {
              compResult = 1;
            }
          else
            {

              // neither index reached max, so do an actual comparison of all
              // 'real' boundaries in the encoded representation (the first
              // and last boundaries are always the same)
              compResult = str_cmp(binaryBoundaryValues_[thisix],
			           other.binaryBoundaryValues_[otherix],
			           encodedBoundaryKeyLength_);
            }

        }

      // take the smallest boundary value and insert it into result
      if (compResult <= 0)
	{
	  // "this" has the smaller (or equal) boundary value
	  result->boundaryValues_.insertAt(
	       resultix,
	       boundaryValues_[thisix]);
	  result->binaryBoundaryValues_.insertAt(
	       resultix,
	       binaryBoundaryValues_[thisix]);
	  thisix++;

	  // if the boundaries are identical, skip both of them
	  if (compResult == 0)
	    otherix++;
	}
      else
	{
	  // "other" has the smaller boundary value
	  result->boundaryValues_.insertAt(
	       resultix,
	       other.boundaryValues_[otherix]);
	  result->binaryBoundaryValues_.insertAt(
	       resultix,
	       other.binaryBoundaryValues_[otherix]);
	  otherix++;
	}

      // Increment result index for next boundary.
      resultix++;

    }

  result->encodedBoundaryKeyLength_ = encodedBoundaryKeyLength_;
  result->partitionCount_ = resultix - 1;

  return result;
} // RangePartitionBoundaries::merge

Lng32 RangePartitionBoundaries::getOptimizedNumberOfPartKeys()
{
  // Do an optimization of the partitioning key: what is passed in is
  // currently the clustering key. Count the actual number of first
  // keys that are specified in the DDL and only use the max. number
  // of items instead of all clustering key columns. Example:
  //
  // create table t (a int, b int, c int, primary key (a,b desc,c)
  // partition (add first key (1), add first key (2,2));
  //
  // In this case, the partitioning key order is not (a,b desc,c), but
  // (a, b desc). This optimization should be done in CATMAN. Remove
  // this method once that is done.
  //
  CollIndex numPartKeyCols = 0;
  for (CollIndex i = 1; i < (CollIndex) partitionCount_; i++)
    {
      // Are there more values listed than we assume as partitioning
      // key columns? Adjust our assumption if needed.

      if ( boundaryValues_[i] ) {
         CollIndex numKeySpecs = boundaryValues_[i]->entries();
         if (numKeySpecs > numPartKeyCols)
            numPartKeyCols = numKeySpecs;
      } else {
        // no boundary values specified in SQL syntax
        // (e.g. when we got binary region boundaries only)
        numPartKeyCols = partKeyColumnCount_;
        break;
      }
    }

  return numPartKeyCols;
}

Lng32 RangePartitionBoundaries::scaleNumberOfPartitions(
     Lng32 suggestedNewNumberOfPartitions,
     const NodeMap* nodeMap,
     PartitionGroupingDistEnum partGroupDist)
{

  //  * * * * * * * * * * * * *  I M P O R T A N T  * * * * * * * * * * * * * *
  //  *                                                                       *
  //  *  THIS MEMBER FUNCTION USES THE SAME GROUPING ALGORITHM AS MEMBER      *
  //  * FUNCTION NodeMap::deriveGrouping().  ANY CHANGES TO THIS MEMBER       *
  //  * FUNCTION WOULD NECESSITATE CHANGES TO MEMBER FUNCTION                 *
  //  * NodeMap::deriveGrouping() AS WELL.                                    *
  //  *                                                                       *
  //  * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *

  // For now, the only way of scaling the number of partitions
  // is by eliminating part of the boundaries (deleting all but
  // every i-th boundary where i is an integer number).

  // There are two problems with this limitation: the suggestion may
  // ask for an increase in the number of partitions, or the suggestion
  // may cause an imbalance because the suggested number times i is not
  // equal to the existing number of partitions.
  // Here are the heuristics that we are using until we get smarter:
  //
  // H1: Treat the suggested number as an upper limit of new partitions.
  //     Often, the suggestion is the number of CPUs available and plans
  //     with 13 partitions don't run very well on 12 CPUs.
  //
  // H2: Distribute the old partitions such that the number of old partitions
  //     for each new partition varies at most by one (a new partition has
  //     either o or o+1 old partitions in it).
  //
  // We have two ways of distributing the partitions: "uniformly" (off
  // by at most one) distribute the number of physical partitions
  // amongst the groups, or uniformly distribute the number of active
  // partitions amongst the groups. The latter is preferable if there
  // are many inactive partitions and the active partitions are not
  // uniformly distributed amongst them. Actually, if the active
  // partition estimate is correct, we cannot go wrong distributing
  // the partitions based on the number of active partitions.

  // In cases where this would combine partitions from two or more clusters
  // it is better not to form such groups, as this would limit autonomy
  // and also lead to more messages between clusters. In such cases, this 
  // method will return with the original partition boundary arrays.

  // Make copies in case we need to undo.

  ARRAY(const ItemExprList *) undoBoundaryValues(boundaryValues_);
  ARRAY(const char *)  undoBinaryBoundaryValues(binaryBoundaryValues_);

  // As already said, we can't increase the number of partitions
  if (suggestedNewNumberOfPartitions >= partitionCount_)
    return partitionCount_;

  CMPASSERT(suggestedNewNumberOfPartitions > 1); // sanity check

  NABoolean useAPDistribution = FALSE;

  // Determine partition grouping distribution algorithm to use
  if (partGroupDist == DEFAULT_PARTITION_GROUPING)
  {
    if (CmpCommon::getDefault(BASE_NUM_PAS_ON_ACTIVE_PARTS) == DF_ON)
      useAPDistribution = TRUE;
    else
      useAPDistribution = FALSE;
  }
  else if (partGroupDist == UNIFORM_PHYSICAL_PARTITION_GROUPING)
  {
    useAPDistribution = FALSE;
  }
  else if (partGroupDist == UNIFORM_ACTIVE_PARTITION_GROUPING)
  {
    useAPDistribution = TRUE;
  }

  Lng32 newix = 1;
  Lng32 prevPartStart = 0;

  if (useAPDistribution)
  {
    // Uniform number of active partitions grouping

    CostScalar activePartitions =
      ((NodeMap *)(nodeMap))->getNumActivePartitions();
    Lng32 numActiveParts = (Lng32)activePartitions.getValue();

    if (suggestedNewNumberOfPartitions > numActiveParts)
      return partitionCount_;

    Lng32 currentNumActiveParts = 0;
    Lng32 numActivePartsPerGroup = numActiveParts / suggestedNewNumberOfPartitions;
    Lng32 remainder = numActiveParts % suggestedNewNumberOfPartitions;
    Lng32 oldix = 1;
    Lng32 partNum = 0;
    NodeMapEntry::PartitionState partState;
    NABoolean isPartActive = FALSE;

    while ((partNum < partitionCount_) AND
           (newix != suggestedNewNumberOfPartitions))
    {
      // Determine if the current partition is active
      partState = nodeMap->getNodeMapEntry((CollIndex)partNum)->
                    getPartitionState();
      isPartActive = ((partState == NodeMapEntry::ACTIVE) OR
        (partState == NodeMapEntry::ACTIVE_NO_DATA));

      if ((remainder != 0) AND
          ((newix - 1) + remainder == suggestedNewNumberOfPartitions))
      {
        // all the remaining groups get one extra active partition
        numActivePartsPerGroup++;
        remainder = 0; // so we won't do it again
      }

      if (isPartActive)
      {
        // one more active partition for this group
        currentNumActiveParts++;
        if (currentNumActiveParts == numActivePartsPerGroup)
        {
          // full group - time to end it, so check that partns on same cluster
          if (nodeMap->isMultiCluster(prevPartStart, oldix, TRUE))
          {
            // More than one cluster in a group - undo.
            boundaryValues_ = undoBoundaryValues;
            binaryBoundaryValues_ = undoBinaryBoundaryValues;
            return partitionCount_;
          }
          // Transfer the values over
          boundaryValues_[newix] = boundaryValues_[oldix];
          binaryBoundaryValues_[newix] = binaryBoundaryValues_[oldix];
          // advance to prepare for next new partition boundary
          newix++;
          // reset number of active partitions in the current group
          currentNumActiveParts = 0;
          // remember where that previous group started
          prevPartStart = oldix;
        }
      } // end if current partition is active
      // advance to next old partition boundary
      oldix++;
      // advance to next partition
      partNum++;
    } // end while more partitions and more new boundaries to produce

  }
  else  // Uniform number of physical partitions grouping
  {
    // Determine an integer value i such that partitionCount_ / i is
    // approximately equal to suggestedNewNumberOfPartitions.
    Lng32 numPartsPerGroup = partitionCount_ / suggestedNewNumberOfPartitions;
    Lng32 remainder = partitionCount_ % suggestedNewNumberOfPartitions;

    // If the remainder is not zero we will end up with an imbalanced new
    // partitioning scheme. To reduce the imbalance, distribute the remainder
    // partitions such that at most one of them goes into each new partition.
    // The easiest way to do this is to give one extra old partition to the
    // last <remainder> of the new partitions.

    // Remember that entries 0 and partitionCount_ stand for the min and
    // max key and should be preserved. For the entries in between, take
    // only every <numPartsPerGroup> one and make a new, contiguous array.
    Lng32 oldix = numPartsPerGroup;
    while (oldix < partitionCount_)
    {
      if (nodeMap->isMultiCluster(prevPartStart, oldix, FALSE))
      {
        // More than one cluster in a group - undo.
        boundaryValues_ = undoBoundaryValues;
        binaryBoundaryValues_ = undoBinaryBoundaryValues;
        return partitionCount_;
      }
      boundaryValues_[newix] = boundaryValues_[oldix];
      binaryBoundaryValues_[newix] = binaryBoundaryValues_[oldix];

      if (newix + remainder == suggestedNewNumberOfPartitions)
	numPartsPerGroup++; // rest of the new partitions get one extra

      prevPartStart = oldix;
      oldix += numPartsPerGroup;
      newix += 1;
    }

  } // end if uniform number of physical partitions grouping

  // move the last entry for the max key
   if (nodeMap->isMultiCluster(prevPartStart, partitionCount_, 
                               useAPDistribution))
   {
     // More than one cluster in a group - undo.
     boundaryValues_ = undoBoundaryValues;
     binaryBoundaryValues_ = undoBinaryBoundaryValues;
     return partitionCount_;
   }
  boundaryValues_[newix] = boundaryValues_[partitionCount_];
  binaryBoundaryValues_[newix] = binaryBoundaryValues_[partitionCount_];

  // Sanity check, we should now have exactly suggestedNewNumberOfPartitions+1
  // new entries in the boundaryValues_ array.
  CMPASSERT(newix == suggestedNewNumberOfPartitions);

  // to be nice, make the rest of the array empty
  for (newix = newix+1; newix <= partitionCount_; newix++)
  {
    boundaryValues_.remove(newix);
    binaryBoundaryValues_.remove(newix);
  }

  partitionCount_ = suggestedNewNumberOfPartitions;

  return partitionCount_;
}

NABoolean RangePartitionBoundaries::isAGroupingOf(
     const RangePartitionBoundaries &other,
     Lng32* maxPartsPerGroup) const
{
  // Call the method that compares range partition boundaries, with
  // the optional parameter that indicates if grouping is OK set to TRUE.
  return compareRangePartitionBoundaries(other,TRUE,maxPartsPerGroup);
} // RangePartitionBoundaries::isAGroupingOf()

// -----------------------------------------------------------------------
// RangePartitionBoundaries::getBoundaryValues()
// -----------------------------------------------------------------------
const ItemExprList* RangePartitionBoundaries::getBoundaryValues(Lng32 index) const
{
  // It is legal to index entries in the range [0, partitionCount_].
  CMPASSERT( (index >= 0) AND (index <= partitionCount_) );
  return boundaryValues_[index];
} // RangePartitionBoundaries::getBoundaryValues()

// -----------------------------------------------------------------------
// RangePartitionBoundaries::getBinaryBoundaryValue()
// -----------------------------------------------------------------------
const char * RangePartitionBoundaries::getBinaryBoundaryValue(
     Lng32 index) const
{
  // It is legal to index entries in the range [0, partitionCount_].
  CMPASSERT( (index >= 0) AND (index <= partitionCount_) );
  return binaryBoundaryValues_[index];
} // RangePartitionBoundaries::getBinaryBoundaryValue()

// -----------------------------------------------------------------------
// RangePartitionBoundaries::completePartitionBoundaries()
// -----------------------------------------------------------------------
void RangePartitionBoundaries::completePartitionBoundaries(
     const ValueIdList& partitioningKeyOrder,
     Lng32 encodedBoundaryKeyLength)
{
  // recognize from the fact that the encoded key length is
  // not set yet that this method hasn't been called before
  CMPASSERT(encodedBoundaryKeyLength_ == -1);

  NABoolean ascending_key_column = TRUE;

  ItemExprList *firstKey = new(CmpCommon::statementHeap())
    ItemExprList(CmpCommon::statementHeap());
  ItemExprList *lastKey = new(CmpCommon::statementHeap())
    ItemExprList(CmpCommon::statementHeap());

  // set some data members that could not be set before
  partKeyColumnCount_ = partitioningKeyOrder.entries();
  encodedBoundaryKeyLength_ = encodedBoundaryKeyLength;

  Lng32 i;

  // for each partitioning key column
  for (i = 0; i < partKeyColumnCount_; i++)
    {
      // get the type of the partitioning key column
      const NAType &prototypeType = partitioningKeyOrder[i].getType();

      ItemExpr *minVal = new(CmpCommon::statementHeap())
	                      SystemLiteral(&prototypeType,
					    TRUE /*want min*/,
					    TRUE /*allow NULL*/);
      ItemExpr *maxVal = new(CmpCommon::statementHeap())
	                      SystemLiteral(&prototypeType,
					    FALSE,
					    TRUE);

      ItemExpr * orderExpr = partitioningKeyOrder[i].getItemExpr();

      // add the min/max values to the first and last key
      if (orderExpr->getOperatorType() != ITM_INVERSE)
	{
	  // Ascending key column, create min value for first key and
	  // max value for last key
          ascending_key_column = TRUE;
	  firstKey->insert(minVal);
	  lastKey->insert(maxVal);
	}
      else
	{
	  // Descending key column, create max value for first key and
	  // min value for last key
          ascending_key_column = FALSE;
	  firstKey->insert(maxVal);
	  lastKey->insert(minVal);
	}

      // Check if any first key values don't specify this partitioning
      // key column. If so, add the MIN or the MAX value for that
      // column as the missing first key value.
      for (CollIndex j = 1; j < (CollIndex)partitionCount_; j++)
      {
        if (boundaryValues_[j]->entries() == (CollIndex)i)
        {
          // This first key does not have enough entries to have
          // specified a value for this column. Add it. Add the
          // MIN value if ascending, add the MAX value if descending.
          // Need to cast away the const-ness to do this - sorry.
          if (ascending_key_column)
            ((ItemExprList*)boundaryValues_[j])->insert(minVal);
          else
            ((ItemExprList*)boundaryValues_[j])->insert(maxVal);
        }
      }
    } // for each partitioning key column

  // finally, add the created begin and end key as the first key and
  // after the last start key
  boundaryValues_[0] = firstKey;
  boundaryValues_[partitionCount_] = lastKey;

  // do we have any binary boundary values at all?
  if (binaryBoundaryValues_.entries())
    {
      if(!setBinaryBoundaryFirstLastKey_)
      {
      // yes, make entries for binary encoded keys with all zeroes and
      // all ones for the first and last key, respectively
      char * binaryMin =
	             new(heap_) char[encodedBoundaryKeyLength_];
      char * binaryMax =
	             new(heap_) char[encodedBoundaryKeyLength_];
      str_pad(binaryMin,encodedBoundaryKeyLength_,'\0');
      str_pad(binaryMax,encodedBoundaryKeyLength_,'\377');
      binaryBoundaryValues_.insertAt(0,binaryMin);
      binaryBoundaryValues_.insertAt(partitionCount_,binaryMax);
        setBinaryBoundaryFirstLastKey_ = TRUE;
      }
    }
  else
    {
      // indicate that there are no binary key values (happens for SQL/MP)
      encodedBoundaryKeyLength_ = 0;
    }
  checkConsistency(partitionCount_);
} // RangePartitionBoundaries::completePartitionBoundaries()


ItemExpr * getRangePartitionBoundaryValues
                        (const char * keyValueBuffer,
                         const Lng32   keyValueBufferSize,
                         NAMemory* heap,
                          CharInfo::CharSet strCharSet = CharInfo::UTF8
                         );

void RangePartitionBoundaries::setupForStatement(NABoolean useStringVersion)
{
  if(setupForStatement_)
    return;

  if ( useStringVersion ) {

    for(UInt32 i=0; i < boundaryStringsList_.entries(); i++)
    {
       if (!boundaryStringsList_.used(i) || !boundaryStringsList_[i]) 
       {
         boundaryValuesList_.insertAt(i,NULL);
       } else {
         boundaryValuesList_.insertAt(i,
                                   getRangePartitionBoundaryValues
                                   ( boundaryStringsList_[i]->data(),
                                     boundaryStringsList_[i]->length(),
                                     
        // Warning: use stmt heap so that the returned itemExpr is NOT
        // allocated on heap_, which can be the context heap.
        // If the allocation is on the context heap, the memory
        // will not be deleted (no way to delete a complicated itemExpr) 
        // during RangePartitionBoundaries::resetAfterStatement() call and
        // we can fail the assertion test
        // CMPASSERT(sizeAfterLastStatement_ <= initialSize_ )
        // in RangePartitionBoundaries::resetAfterStatement()

                                     CmpCommon::statementHeap() 
                                   ));
       }
    }
  }

  for(UInt32 i=0; i < boundaryValuesList_.entries(); i++)
  {
    bindAddBoundaryValue(i);
  }

  setupForStatement_ = TRUE;
  resetAfterStatement_ = FALSE;
}

void RangePartitionBoundaries::resetAfterStatement()
{
  if(resetAfterStatement_)
    return;

  boundaryValuesList_.clear();
  boundaryValues_.clear();
  partKeyColumnCount_ = origPartKeyColumnCount_;
  partitionCount_ = origPartitionCount_;
  setupForStatement_ = FALSE;
  resetAfterStatement_ = TRUE;
}

// -----------------------------------------------------------------------
// Method for debugging.
// -----------------------------------------------------------------------
void RangePartitionBoundaries::print(FILE* ofd, const char* indent,
				     const char* title) const
{
  BUMP_INDENT(indent);
  char S[500];
  int index;

  fprintf(ofd,"%s %s\n",NEW_INDENT,title);
  for (index = 0; index < partitionCount_; index++)
    {
      const ItemExprList* iel = boundaryValues_[index];
      snprintf(S, sizeof(S), "boundary[%d]: ", index);
      if (iel)
	iel->print(ofd, indent, S);
      else
	fprintf(ofd,"%s %s is empty\n",NEW_INDENT,S);
    }
  for (int index2 = 0; index2 < partitionCount_; index2++)
    if (binaryBoundaryValues_.used(index2) &&
        binaryBoundaryValues_[index2])
      {
        const char *binaryVal = binaryBoundaryValues_[index2];

        fprintf(ofd, "binary boundary[%d]: 0x", // %#0*",
                index2);

        for (int b=0; b<encodedBoundaryKeyLength_; b++)
          fprintf(ofd, "%02hhx", binaryVal[b]);
        fprintf(ofd, "\n");
      }

  fprintf(ofd,"%s %s (in binary form)\n",NEW_INDENT,title);
  Lng32 keyLen = getEncodedBoundaryKeyLength();
  for (index = 0; index < partitionCount_; index++)
    {
       const char* bValues = getBinaryBoundaryValue(index);
       for (Int32 j = 0; j < keyLen; j++) {
	 fprintf(ofd,"%04x ", (int)bValues[j]);
       }
    }

} // RangePartitionBoundaries::print()

// ***********************************************************************
// RangePartitioningFunction
// ***********************************************************************

RangePartitioningFunction::RangePartitioningFunction(
     const RangePartitioningFunction& other,
     NAMemory* heap)
     : PartitioningFunction(other, heap),
       keyColumnList_(other.keyColumnList_),
       orderOfKeyValues_(other.orderOfKeyValues_),
       originalKeyColumnList_(other.originalKeyColumnList_),
       setupForStatement_(FALSE),
       resetAfterStatement_(FALSE)
{
  partitionBoundaries_ = new(heap)
    RangePartitionBoundaries(*(other.partitionBoundaries_),heap);
}

RangePartitioningFunction::~RangePartitioningFunction() {}

Lng32 RangePartitioningFunction::getCountOfPartitions() const
                  { return partitionBoundaries_->getCountOfPartitions(); }

const RangePartitioningFunction*
RangePartitioningFunction::castToRangePartitioningFunction() const
                                                          { return this; }

PartitioningRequirement*
RangePartitioningFunction::makePartitioningRequirement()
{
  return new (CmpCommon::statementHeap())
             RequireRange(this);
}

PartitioningFunction*
RangePartitioningFunction::copy() const
{ return new(CmpCommon::statementHeap()) RangePartitioningFunction(*this); }

// -----------------------------------------------------------------------
// RangePartitioningFunction::normalizePartitioningKeys()
// Rewrite the partitioning keys of the partitioning function in
// terms of the VEGReference for the VEG to which the partitioning
// key column belongs.
// -----------------------------------------------------------------------
void RangePartitioningFunction::normalizePartitioningKeys(NormWA& normWARef)
{
  PartitioningFunction::normalizePartitioningKeys(normWARef);
  keyColumnList_.normalizeNode(normWARef);
  orderOfKeyValues_.normalizeNode(normWARef);
  // don't normalize original key col list, avoid VEGies which could
  // cause data type changes.
} // RangePartitioningFunction::normalizePartitioningKeys

// -----------------------------------------------------------------------
// RangePartitioningFunction::comparePartFuncToFunc()
// -----------------------------------------------------------------------
COMPARE_RESULT
RangePartitioningFunction::comparePartFuncToFunc
                               (const PartitioningFunction &other) const
{
  // Make use of the implementation of isAGroupingOf. According to the
  // definition of a grouping, two partitioning functions must be the
  // same if one is a grouping of the other and if they have the same
  // number of partitions. This is not a very straightforward way to
  // compare two range partitioning functions but it avoids
  // duplication of code.

  if (isAGroupingOf(other) AND
      getCountOfPartitions() == other.getCountOfPartitions())
    return SAME;
  else
    return INCOMPATIBLE;

} // RangePartitioningFunction::comparePartFuncToFunc()

NABoolean RangePartitioningFunction::isAGroupingOf(
     const PartitioningFunction &other,
     Lng32* maxPartsPerGroup) const
{
  const RangePartitioningFunction *oth =
    other.castToRangePartitioningFunction();

  // a range partitioning function can only be a grouping of
  // another range partitioning function
  if (oth == NULL)
    return FALSE;

  // to be a grouping of the other function, this function has to be
  // partitioned on the same columns in the same sequence and order
  // (a prefix might work in some cases which we don't recognize
  // at this point, sorry)
  if (orderOfKeyValues_.entries() != oth->orderOfKeyValues_.entries())
    return FALSE;

  // compare the key columns and their order
  for (CollIndex i = 0; i < orderOfKeyValues_.entries(); i++)
    {
      if (orderOfKeyValues_[i] != oth->orderOfKeyValues_[i])
	{
	  // value ids are different, but this may just be that we
	  // have different inversion expressions
	  if (orderOfKeyValues_[i].getItemExpr()->getOperatorType() !=
                                                          ITM_INVERSE OR
	      oth->orderOfKeyValues_[i].getItemExpr()->getOperatorType() !=
                                                          ITM_INVERSE OR
	      orderOfKeyValues_[i].getItemExpr()->child(0) !=
	      oth->orderOfKeyValues_[i].getItemExpr()->child(0))
	    return FALSE; // really different columns
	}
    }

  // -----------------------------------------------------------------
  // Compare the partition boundaries
  // -----------------------------------------------------------------
  return partitionBoundaries_->isAGroupingOf(*oth->partitionBoundaries_,
                                             maxPartsPerGroup);
}

// -----------------------------------------------------------------------
// RangePartitioningFunction::createPartitioningKeyPredicates()
// -----------------------------------------------------------------------
void RangePartitioningFunction::createPartitioningKeyPredicates()
{
  if (NOT partKeyPredsCreated())
    {
      char fabricatedName[20]; // a name for the host variable
      char* S = fabricatedName;

      CollIndex nCols = keyColumnList_.entries();
      ValueIdSet setOfKeyPredicates;
      ValueIdList partInputValues(2*nCols+1);
      ValueIdList loValues;
      ValueIdList hiValues;

      // -----------------------------------------------------------------
      // create partition input variables: one low value for each range
      // key column, one hi value for each range key column, and one
      // inclusion indicator column
      // -----------------------------------------------------------------

      for (CollIndex beginEnd = 0; beginEnd < 2; beginEnd++)
	for (CollIndex i = 0; i < nCols; i++)
	  {
	    ValueId vid = originalKeyColumnList_[i];

	    // -- Fabricate a name for the first host variable
	    if (beginEnd == 0)
	      sprintf(S,"_sys_HostVarLo%d",i);
	    else
	      sprintf(S,"_sys_HostVarHi%d",i);

	    // -- Build a host var node with the key column's type
	    HostVar *hv = new(CmpCommon::statementHeap())
	      HostVar(S, &(vid.getType()), TRUE);
	    hv->synthTypeAndValueId();

	    // insert the partition input variable
	    partInputValues.insert(hv->getValueId());
	    if (beginEnd == 0)
	      loValues.insert(hv->getValueId());
	    else
	      hiValues.insert(hv->getValueId());
          }

      // The inclusion indicator is a 4-byte integer, no NULLs allowed.
      // At execution time it is set to a non-zero value if the end
      // key is excluded and to a zero value if it is included.
      ItemExpr *intervalExclusionIndicator =
	new(CmpCommon::statementHeap()) HostVar(
	     "_sys_hostVarExclRange",
	     new(CmpCommon::statementHeap()) SQLInt(CmpCommon::statementHeap(), TRUE,FALSE),
	     TRUE);
      intervalExclusionIndicator->synthTypeAndValueId();
      partInputValues.insert(intervalExclusionIndicator->getValueId());

      // -----------------------------------------------------------------
      // Create the semantic equivalent of multi-valued predicates, taking
      // the order of keys into account (which can't be expressed in
      // "normal" multi-valued predicates). Set the "special nulls"
      // property for all comparison operators that involve nullable
      // columns, since we do not want to filter out any NULL values.
      // -----------------------------------------------------------------

      // -----------------------------------------------------------------
      // a GREATER_EQ multi-valued predicate for the begin key
      // -----------------------------------------------------------------
      CollIndex i = keyColumnList_.entries()-1;
      CollHeap *h = CmpCommon::statementHeap();

      // the comparison operator for the current column, taking the
      // order of the column into account
      OperatorTypeEnum compOp,extraOp;
      ItemExpr *kc = keyColumnList_[i].getItemExpr();
      NABoolean nullable =
	keyColumnList_[i].getType().supportsSQLnullLogical();

      // start with a greater or equal comparison for the last key
      // column (may be the only key column), use less equal if the
      // column is descending
      if (orderOfKeyValues_[i].getItemExpr()->getOperatorType() ==
	  ITM_INVERSE)
        {
	 compOp = ITM_LESS_EQ;
         extraOp = ITM_GREATER_EQ;
        }
      else
        {
	 compOp = ITM_GREATER_EQ;
         extraOp = ITM_LESS_EQ;
        }

      ItemExpr * beginPred = new(h) BiRelat(compOp,
				      kc,
				      loValues[i].getItemExpr(),
				      nullable,
                                      TRUE);


      // There are now two end predicates generated.  The tri-relational predicate
	  //   is the one that enforces the inclusive comparison for the last partition,
	  //   and only exclusive comparison for the other partitions.  Thus this
	  //   accurately models way partitions are setup.

	  // The extraEndPred has been added (HL 6/20/2001) to form a convenient key predicate
	  //   for access by a B-Tree index.  Although this is always inclusive, and thus
	  //   will select extra data (one extra uec) for all partitions except the last,
	  //   the extra data will be filtered out by the tri-relational operator.


      ItemExpr * extraEndPred = new(h) BiRelat(extraOp,
                                               kc,
                                               hiValues[i].getItemExpr(),
                                               nullable,
                                               TRUE);


      // for all other key columns, add (ai > bi) or (ai = bi) and (tfm)
      while (i--)
	{
	  if (orderOfKeyValues_[i].getItemExpr()->getOperatorType() ==
	      ITM_INVERSE)
            {
	     compOp = ITM_LESS;
             extraOp = ITM_GREATER;
            }
	  else
            {
	     compOp = ITM_GREATER;
             extraOp = ITM_LESS;
            }
	  kc = keyColumnList_[i].getItemExpr();
	  nullable =
	    keyColumnList_[i].getType().supportsSQLnullLogical();

	  beginPred = new(h) BiLogic(
	       ITM_OR,
	       new(h) BiRelat(compOp,
			      kc,
			      loValues[i].getItemExpr(),
			      nullable,
                              TRUE),
	       new(h) BiLogic(ITM_AND,
			      new(h) BiRelat(
				   ITM_EQUAL,
				   kc,
				   loValues[i].getItemExpr(),
				   nullable,
                                   TRUE),
			      beginPred));


	  extraEndPred = new(h) BiLogic(
	       ITM_OR,
	       new(h) BiRelat(extraOp,
			      kc,
			      hiValues[i].getItemExpr(),
			      nullable,
                              TRUE),
	       new(h) BiLogic(ITM_AND,
			      new(h) BiRelat(
				   ITM_EQUAL,
				   kc,
				   hiValues[i].getItemExpr(),
				   nullable,
                                   TRUE),
			      extraEndPred));
	}

      // -----------------------------------------------------------------
      // a less or less-equal predicate for the end key
      // -----------------------------------------------------------------

      // reset to the end of the key column list
      i = keyColumnList_.entries()-1;

      // Start with a tri-relational comparison for the last key
      // column (may be the only key column), it's a LESS comparison
      // if the exclusion indicator is not zero, or LESS_EQUAL otherwise.
      // Note: replace "less" with "greater" for descending columns.
      if (orderOfKeyValues_[i].getItemExpr()->getOperatorType() ==
	  ITM_INVERSE)
	compOp = ITM_GREATER_OR_GE;
      else
	compOp = ITM_LESS_OR_LE;

      kc = keyColumnList_[i].getItemExpr();
      nullable =
	keyColumnList_[i].getType().supportsSQLnullLogical();

      TriRelational * tr = new(h) TriRelational(
	   compOp,
	   kc,
	   hiValues[i].getItemExpr(),
	   new(h) BiRelat(ITM_NOT_EQUAL,
			  new(h) SystemLiteral(0),
			  intervalExclusionIndicator));
      tr->setSpecialNulls(nullable);
      ItemExpr *endPred = tr;

      // for all other key columns, add (ai < bi) or (ai = bi) and (tfm)
      while (i--)
	{
	  if (orderOfKeyValues_[i].getItemExpr()->getOperatorType() ==
	      ITM_INVERSE)
	    compOp = ITM_GREATER;
	  else
	    compOp = ITM_LESS;

	  kc = keyColumnList_[i].getItemExpr();
	  nullable =
	    keyColumnList_[i].getType().supportsSQLnullLogical();

	  endPred = new(h) BiLogic(
	       ITM_OR,
	       new(h) BiRelat(compOp,
			      kc,
			      hiValues[i].getItemExpr(),
			      nullable),
	       new(h) BiLogic(ITM_AND,
			      new(h) BiRelat(
				   ITM_EQUAL,
				   kc,
				   hiValues[i].getItemExpr(),
				   nullable),
			      endPred));
	}


      endPred->synthTypeAndValueId();
      beginPred->synthTypeAndValueId();
      extraEndPred->synthTypeAndValueId();

      setOfKeyPredicates += beginPred->getValueId();
      setOfKeyPredicates += endPred->getValueId();
      setOfKeyPredicates += extraEndPred->getValueId();

      // -- Store the set of key predicates and the partition input values
      // in the partitioning attributes.
      storePartitioningKeyPredicates(setOfKeyPredicates);
      storePartitionInputValues(partInputValues);
    }
} // RangePartitioningFunction::createPartitioningKeyPredicates()

// -----------------------------------------------------------------------
// RangePartitioningFunction::replacePivs()
// -----------------------------------------------------------------------
void RangePartitioningFunction::replacePivs(
       const ValueIdList& newPivs,
       const ValueIdSet& newPartKeyPreds)
{
  // Overwrite the old pivs, part key preds, and part expr. with the new ones.
  storePartitionInputValues(newPivs);
  storePartitioningKeyPredicates(newPartKeyPreds);
} // RangePartitioningFunction::replacePivs()

PartitioningFunction * RangePartitioningFunction::scaleNumberOfPartitions(
     Lng32 &suggestedNewNumberOfPartitions,
     PartitionGroupingDistEnum partGroupDist)
{
  if (suggestedNewNumberOfPartitions == 1)
    return new(CmpCommon::statementHeap())
      SinglePartitionPartitioningFunction();

  // If an expression has been generated, then we want to discard it
  // because it may no longer be correct.
  storeExpression(NULL);

  // the power of delegation
  suggestedNewNumberOfPartitions =
    partitionBoundaries_->scaleNumberOfPartitions(
       suggestedNewNumberOfPartitions,
       getNodeMap(),
       partGroupDist);

  return this;
}

// -----------------------------------------------------------------------
// RangePartitioningFunction::remapIt()
// -----------------------------------------------------------------------
void RangePartitioningFunction::remapIt
                                  (const PartitioningFunction* opf,
			           ValueIdMap& map, NABoolean mapItUp)
{
  PartitioningFunction::remapIt(opf, map,mapItUp);

  // If we have arrived here, the original partitioning function (*opf)
  // MUST be a RangePartitioningFunction().
  CMPASSERT(opf->castToRangePartitioningFunction());

  // Clear because rewrite insists on it being so.
  keyColumnList_.clear();

  if (mapItUp)
    {
      map.rewriteValueIdListUp(keyColumnList_,
			       opf->castToRangePartitioningFunction()->
                                 keyColumnList_);
    }
  else
    {
      map.rewriteValueIdListDown(opf->castToRangePartitioningFunction()->
                                   keyColumnList_,
				 keyColumnList_);
    }

  // Clear because rewrite insists on it being so.
  orderOfKeyValues_.clear();

  if (mapItUp)
    {
      map.rewriteValueIdListUp(orderOfKeyValues_,
			       opf->castToRangePartitioningFunction()->
                                 orderOfKeyValues_);
    }
  else
    {
      map.rewriteValueIdListDown(opf->castToRangePartitioningFunction()->
                                   orderOfKeyValues_,
				 orderOfKeyValues_);
    }

  // do NOT map the originalKeyColumnList_, that's why it's called ORIGINAL

} // RangePartitioningFunction::remapIt()

// -----------------------------------------------------------------------
// RangePartitioningFunction::createPartitioningFunctionForIndexDesc()
// -----------------------------------------------------------------------

PartitioningFunction*
RangePartitioningFunction::createPartitioningFunctionForIndexDesc
                         (IndexDesc *idesc) const
{
  const NAFileSet * fileSet = idesc->getNAFileSet();
  const NAColumnArray & allColumns = fileSet->getAllColumns();
  const NAColumnArray & partKeyColumns = fileSet->getPartitioningKeyColumns();

  Lng32 ixColNumber;
  ValueId keyValueId;
  ValueIdSet partitioningKey;
  ValueIdList partitioningKeyList;
  ValueIdList orderOfPartKeyValues;
  RangePartitionBoundaries * partBounds;

  CollIndex i = 0;
  for (i = 0; i < partKeyColumns.entries(); i++)
    {
      // which column of the index is this (usually this will be == i)
      ixColNumber = allColumns.index(partKeyColumns[i]);

      // insert the value id of the index column into the partitioning
      // key column value id list
      keyValueId = idesc->getIndexColumns()[ixColNumber];
      partitioningKey += keyValueId;
      partitioningKeyList.insertAt(i,keyValueId);

      // insert the same value id into the order list, if the column
      // is in ascending order, otherwise insert the inverse of the
      // column
      if (partKeyColumns.isAscending(i))
        {
          orderOfPartKeyValues.insert(keyValueId);
        }
      else
        {
          InverseOrder *invExpr = new(idesc->wHeap())
          InverseOrder(keyValueId.getItemExpr());
          invExpr->synthTypeAndValueId();
          orderOfPartKeyValues.insert(invExpr->getValueId());
        }
    } // end loop over partitioning key columns

  // -----------------------------------------------------------------
  // Allocate new range partition boundaries.
  // -----------------------------------------------------------------
  partBounds = new(idesc->wHeap()) RangePartitionBoundaries
                                   (*getRangePartitionBoundaries(),
                                    idesc->wHeap());

  // ---------------------------------------------------------------------
  // Determine the minimum number of partitioning keys based on the
  // start key values that are specified. Columns for which no explicit
  // start key values are specified need not be part of the part key.
  // Adjust our bookkeeping if necessary. Remove this logic if it can
  // be moved to CATMAN.
  // ---------------------------------------------------------------------
  CollIndex numPartKeyCols = partBounds->getOptimizedNumberOfPartKeys();
  if (partitioningKeyList.entries() > numPartKeyCols)
    {
      while (partitioningKeyList.entries() > numPartKeyCols)
	{
	  partitioningKeyList.removeAt(numPartKeyCols);
	  orderOfPartKeyValues.removeAt(numPartKeyCols);
	}
      partitioningKey.clear();
      partitioningKey.insertList(partitioningKeyList);
    }

  // -----------------------------------------------------------------
  // now compute the encoded key length of all the boundary key columns
  // (there are no fillers between the encoded key values, so it's ok
  // to just add the encoded lengths of the individual columns)
  // -----------------------------------------------------------------
  Lng32 encodedBoundaryKeyLength = 0;
  for (i = 0; i < orderOfPartKeyValues.entries(); i++)
    {
      encodedBoundaryKeyLength +=
	partitioningKeyList[i].getType().getEncodedKeyLength();
    }

  // -----------------------------------------------------------------
  // Add the first and the last boundary (0 and numberOfPartitions)
  // at the ends that do not separate two partitions
  // -----------------------------------------------------------------
  partBounds->completePartitionBoundaries(
       orderOfPartKeyValues,
       encodedBoundaryKeyLength);

  // -----------------------------------------------------------------
  // Allocate a new RangePartitioningFunction.
  // -----------------------------------------------------------------
  RangePartitioningFunction *partFunc
             = new(idesc->wHeap()) RangePartitioningFunction
                                   (partitioningKey,
                                    partitioningKeyList,
				    orderOfPartKeyValues,
                                    partBounds,
                                    getNodeMap()->copy(idesc->wHeap()));
  partFunc->createPartitioningKeyPredicates();

  return partFunc;

} // RangePartitioningFunction::createPartitioningFunctionForIndexDesc()

// -----------------------------------------------------------------------
// RangePartitioningFunction::createPartitioningExpression()
// -----------------------------------------------------------------------
ItemExpr* RangePartitioningFunction::createPartitioningExpression()
{
  if (getExpression())      // already constructed?
    return getExpression(); // reuse it!

  // create a lookup function that takes the partitioning key and looks
  // up its partition number in the array of split ranges provided by
  // this partitioning function.


  // first, encode all key values (for asc or desc order) and concatenate
  // the encoded strings
  ItemExpr *encKey = NULL;

  for (CollIndex i=0; i < keyColumnList_.entries(); i++)
    {
      NABoolean descOrder =
	(orderOfKeyValues_[i].getItemExpr()->getOperatorType() == ITM_INVERSE);
      ItemExpr *dataConversionErrorFlag = getConvErrorExpr();
      if (dataConversionErrorFlag == 0)
	{
	  dataConversionErrorFlag =
	    new (CmpCommon::statementHeap()) HostVar(
		 "_sys_repartConvErrorFlg",
		 new (CmpCommon::statementHeap()) SQLInt(CmpCommon::statementHeap(), TRUE,FALSE),
		 TRUE);
	  storeConvErrorExpr(dataConversionErrorFlag);
	}

      // cast the key column to the exact type of the original key column
      const NAType &oType = originalKeyColumnList_[i].getType();
      ItemExpr *c = new (CmpCommon::statementHeap()) Narrow(
	   keyColumnList_[i].getItemExpr(),
	   dataConversionErrorFlag,
	   &oType);
      // form the key encoding of the key column (a character string)
      ItemExpr *e = new (CmpCommon::statementHeap()) CompEncode (c,descOrder);

      // concatenate the individual key encodings of the key columns
      if (encKey == NULL)
	encKey = e;
      else
	encKey = new (CmpCommon::statementHeap()) Concat(encKey,e);
    }

  // give the concatenated key encodings as an input to the range lookup function
  ItemExpr * partFunc = new (CmpCommon::statementHeap()) RangeLookup(
       encKey,this);

  partFunc->synthTypeAndValueId();
  storeExpression(partFunc);
  return partFunc;
}

// -----------------------------------------------------------------------
// RangePartitioningFunction::shouldUseSynchronousAccess()
// -----------------------------------------------------------------------
NABoolean RangePartitioningFunction::shouldUseSynchronousAccess(
    const ReqdPhysicalProperty* rpp,
    const EstLogPropSharedPtr& inputLogProp,
    GroupAttributes* ga) const
{
  NABoolean shouldUseSynchronousAccess = FALSE;
  NABoolean synchronousAccessForced = FALSE;
  NABoolean tooManyPAs = FALSE;
  ValueIdList partKey = getOrderOfKeyValues();
  // Remove from the partitioning key any columns that are covered by
  // constants or input values.
  partKey.removeCoveredExprs(ga->getCharacteristicInputs());


  const LogicalPartitioningRequirement *lpr =
    rpp->getLogicalPartRequirement();

  // Don't do synchronous access if the user is forcing a PAPA node
  // and is not forcing the number of PAs
  if ((lpr != NULL) AND lpr->getMustUsePapa() AND
      (lpr->getNumClientsReq() == ANY_NUMBER_OF_PARTITIONS))
    return FALSE;

  // See if the user is trying to force synchronous access.
  if ((CmpCommon::getDefault(ATTEMPT_ASYNCHRONOUS_ACCESS) == DF_OFF) OR
      ((lpr != NULL) AND
       (lpr->getNumClientsReq() != ANY_NUMBER_OF_PARTITIONS) AND
       (lpr->getNumClientsReq() < getCountOfPartitions())))
  {
    shouldUseSynchronousAccess = TRUE;
    synchronousAccessForced = TRUE;
  }

  // Synchronous access is a good idea and is ok if there is a required order
  // and/or arrangement that came from an operator that is not in DP2,
  // and the required order and/or arrangement columns are a leading
  // prefix of the partitioning key columns, and enough data is being
  // returned that the PA buffers will fill up and so the PA operators
  // will block until all data from all preceding partitions (in the
  // required order) has been returned. Note that whether the required
  // order or arrangement is a leading prefix of the clustering key
  // columns will be checked later (by the satisfied method).
  //   NOTE: we cannot force synchronous access if there is a required
  // logical order and/or arrangement!  If the code below decides that
  // synchronous access is not possible, and the user is trying to
  // force synchronous access, then they are out of luck!

  if (rpp->getLogicalOrderOrArrangementFlag())
  {
    if ((rpp->getSortKey() != NULL) AND
        (rpp->getArrangedCols() != NULL))
    {
      // We have both a required order and an arrangement.
      if ((partKey.satisfiesReqdOrder(*rpp->getSortKey()) == SAME_ORDER) AND
          partKey.satisfiesReqdArrangement(*rpp->getArrangedCols()))
        shouldUseSynchronousAccess = TRUE;
      else
        shouldUseSynchronousAccess = FALSE;
    }
    else if (rpp->getSortKey() != NULL)
    {
      // Only a required order.
      OrderComparison oc = partKey.satisfiesReqdOrder(*rpp->getSortKey());
      NABoolean okToUseSerialOrder = FALSE;

      if (CmpCommon::getDefault(ATTEMPT_REVERSE_SYNCHRONOUS_ORDER) != DF_ON)
      { 
        okToUseSerialOrder = (oc == SAME_ORDER);
      }
      else
      {
        okToUseSerialOrder = (oc == SAME_ORDER || oc == INVERSE_ORDER);
      }

      if (okToUseSerialOrder)
        shouldUseSynchronousAccess = TRUE;
      else
        shouldUseSynchronousAccess = FALSE;
    }
    else if (rpp->getArrangedCols() != NULL)
    {
      // Only a required arrangement.
      if (partKey.satisfiesReqdArrangement(*rpp->getArrangedCols()))
        shouldUseSynchronousAccess = TRUE;
      else
        shouldUseSynchronousAccess = FALSE;
    }
  }

  if (shouldUseSynchronousAccess AND NOT synchronousAccessForced)
  {
    // Don't do synchronous access if there is a dp2 sort order
    // partitioning requirement, as this means we want a DP2 sort
    // order, and the main reason for requiring a DP2 sort order
    // is to avoid having to access the data synchronously.
    if (rpp->getDp2SortOrderPartReq() != NULL)
      shouldUseSynchronousAccess = FALSE;
  }

  if (shouldUseSynchronousAccess AND NOT synchronousAccessForced)
  {
    // There was a required order or arrangement that could potentially
    // need us to access the data synchronously. If synch. access is not
    // forced, see if it is a good idea or not.

    // get the maximum number of PAs per process that can be allowed.
    Int32 maxPAsPerProcess =
      (Int32) CmpCommon::getDefaultNumeric(MAX_ACCESS_NODES_PER_ESP);

    if (getCountOfPartitions() > maxPAsPerProcess)
    {
      // Get the logical part requirement, if one exists.
      PartitioningRequirement *logPartReq = NULL;
      PartitioningFunction    *requiredPartFunc = NULL;
      if (lpr != NULL)
      {
	logPartReq = lpr->getLogReq();
	if (logPartReq AND logPartReq->
	    castToFullySpecifiedPartitioningRequirement())
	  requiredPartFunc = logPartReq->
	    castToFullySpecifiedPartitioningRequirement()->
	    getPartitioningFunction();
      }

      // Determine if we might exceed the maximum number of PAs allowed
      // to a process. If we will, then we definitely want to use
      // synchronous access if there is an order to preserve.
      // The first check is the most restrictive - this is for logical
      // subpartitioning. If we do logical subpartitioning, it is possible
      // one process will have to access all the partitions, and there
      // is nothing we can do about it later.
      if (logPartReq != NULL AND
	  requiredPartFunc AND
	  requiredPartFunc->castToLogPhysPartitioningFunction() AND
	  requiredPartFunc->castToLogPhysPartitioningFunction()->
	     getLogPartType() ==
	        LogPhysPartitioningFunction::LOGICAL_SUBPARTITIONING)
	tooManyPAs = TRUE;
      else if ((logPartReq != NULL) AND
	       (getCountOfPartitions() >
		(logPartReq->getCountOfPartitions() * maxPAsPerProcess)))
	tooManyPAs = TRUE;
      else if ((getCountOfPartitions() >
		(rpp->getCountOfPipelines() * maxPAsPerProcess)))
	tooManyPAs = TRUE;
    }

    if (NOT tooManyPAs)
    {
      // Now check to see how much data will be returned. If it is less
      // than twice the amount of data that it takes to fill up all
      // the PA buffers, than we will be able to access the majority
      // of the data asynchronously before the buffers fill up. So,
      // in this case assume that it is not a good idea to access the
      // data synchronously. Otherwise, synchronous access is a good
      // idea (if it passed all the other checks) because we will be
      // accessing the majority of the data after the PA buffers fill up
      // and block, and so we will end up accessing the data synchronously
      // anyway, so we would rather not pay for all those PA buffers.

      // Compute size of data returned in KB. Allocate 16 bytes per
      // record for overhead.
      CostScalar totalsize =
        (ga->outputLogProp(inputLogProp)->getResultCardinality() *
         (ga->getRecordLength() + 16.0));

      // Compute the size of all the PA buffers.
      double buffersize = CmpCommon::getDefaultNumeric(GEN_PA_BUFFER_SIZE);
      // A PA buffer can not be larger than 56K. If the table has remote
      // partitions, then the actual limit is 31K, so we really should
      // check for remote partitions. That would require the index
      // descriptor, which we don't have.
      buffersize = MINOF (buffersize, 56000);
      // number of PA output buffers is always at least one less than the
      // total number of pa buffers, as at least one is reserved for input.
      double numOutputBuffers =
        MAXOF(CmpCommon::getDefaultNumeric(GEN_PA_NUM_BUFFERS) - 1,1);
      // Subtract 2000 bytes per buffer for headers.
      buffersize = (buffersize *  numOutputBuffers) -
                   (2000 * numOutputBuffers);

      if (totalsize < (getCountOfPartitions() * 2.0 * buffersize))
        shouldUseSynchronousAccess = FALSE;
    }
  } // end if order or arrangement that could be preserved with synch. access

  return shouldUseSynchronousAccess;
}

SearchKey *
RangePartitioningFunction::createSearchKey(const IndexDesc *indexDesc,
                                           ValueIdSet availInputs,
                                           ValueIdSet additionalPreds) const
{
  ValueIdSet preds(getPartitioningKeyPredicates());

  ValueIdSet nonKeyColumnSet; // empty set

  availInputs += getPartitionInputValues();
  preds += additionalPreds;

  // make a search key from all that
  SearchKey *partSearchKey = new (CmpCommon::statementHeap())
    SearchKey(
              indexDesc->getPartitioningKey(),
              indexDesc->getOrderOfPartitioningKeyValues(),
              availInputs,
              TRUE, // there isn't really a scan direction for partKey
              preds,
              this,
              nonKeyColumnSet,
              indexDesc);

  return partSearchKey;
}

/////////////////////////////////////////////////////////////
// Test condition C2.2 (co-partitioned), which is defined as:
//  1. All tables share the same number of partitioned key columns;
//  2. Corresponding partition key columns across all tables are
//     of same type;
//  3. All tables should be partitioned into the same number of ranges;
//  4. Corresponding partitions are bounded by identical upper and
//     lower boundaries and reside on the same DP2.
/////////////////////////////////////////////////////////////
NABoolean
RangePartitioningFunction::partFuncAndFuncPushDownCompatible(
	const PartitioningFunction& x) const
{
   const RangePartitioningFunction* other =
	x.castToRangePartitioningFunction();

   if ( other == NULL ) return FALSE;

   Lng32 thisKeyCount = getPartitioningKey().entries();
   Lng32 otherKeyCount = other->getPartitioningKey().entries();

   // same key count
   if (thisKeyCount != otherKeyCount)
     return FALSE;

   Lng32 thisPartCount = getCountOfPartitions();
   Lng32 otherPartCount = other->getCountOfPartitions();

   // same part count
   if (thisPartCount != otherPartCount)
     return FALSE;

   // same part column type
   for (Lng32 index = 0; index < thisKeyCount; index++)
   {
       if ( NOT getKeyColumnList()[index].getType().isCompatible
                (
                  other->getKeyColumnList()[index].getType()
                )
          )
   	return FALSE;
   }

   // same boundaries
   if ( getRangePartitionBoundaries()->
         compareRangePartitionBoundaries
          (*(other->getRangePartitionBoundaries())) == FALSE
      )
      return FALSE;

   return TRUE;
}

// -----------------------------------------------------------------------
// RangePartitioningFunction::preCodeGen()
// Rewrite the partitioning keys of the partitioning function that
// are expressed using VEGReferences in terms of the available values.
// -----------------------------------------------------------------------
void RangePartitioningFunction::preCodeGen(const ValueIdSet& availableValues)
{
  ValueIdSet noExternalInputs;
  PartitioningFunction::preCodeGen(availableValues);
  keyColumnList_.replaceVEGExpressions(availableValues, noExternalInputs, 
                                        FALSE, NULL, TRUE);
} // RangePartitioningFunction::preCodeGen()

void RangePartitioningFunction::setupForStatement()
{
  if(setupForStatement_)
    return;

  PartitioningFunction::setupForStatement();

  if(partitionBoundaries_) 
    partitionBoundaries_->setupForStatement(TRUE /* use string */);

  setupForStatement_ = TRUE;
  resetAfterStatement_ = FALSE;
}

void RangePartitioningFunction::resetAfterStatement()
{
  if(resetAfterStatement_)
    return;

  PartitioningFunction::resetAfterStatement();
  keyColumnList_.clear();
  orderOfKeyValues_.clear();
  originalKeyColumnList_.clear();

  if(partitionBoundaries_) 
    partitionBoundaries_->resetAfterStatement();
  
  setupForStatement_ = FALSE;
  resetAfterStatement_ = TRUE;
}
// -----------------------------------------------------------------------
// Methods for debugging.
// -----------------------------------------------------------------------
const NAString RangePartitioningFunction::getText() const
{
  NAString result("range partitioned ", CmpCommon::statementHeap());
  char nparts[20];

  sprintf(nparts,"%d",getCountOfPartitions());
  result += nparts;
  result += " ways on (";
  orderOfKeyValues_.unparse(result,DEFAULT_PHASE,EXPLAIN_FORMAT);
  result += ")";

  if ( partitionBoundaries_ ) {

    result += " with boundaries(";

    for (Int32 index = 0; index < partitionBoundaries_->getCountOfPartitions(); index++)
    {
      const ItemExprList* iel = partitionBoundaries_->getBoundaryValues(index);

      for (Int32 j=0; j<iel->entries(); j++) {

         ItemExpr* ie = (*iel)[j];
         if ( ie->getOperatorType() == ITM_CONSTANT ) {
            ConstValue* cv = (ConstValue*)ie;

            result += "c(";
            result += cv->getTextForQuery(QUERY_FORMAT);
            result += ")";
         } else {
            result += "nc(";
            result += ie->getText();
            result += ")";
         }

         result += " ";
      }

      if ( index < partitionBoundaries_->getCountOfPartitions()-1 )
        result += ";";
    }

    result += ")";
    /* enable this for debugging of binary key problems
    result += " binary (";
    Lng32 encodedBoundaryKeyLength = partitionBoundaries_->getEncodedBoundaryKeyLength();
    char hexDigits[4];

    for (Int32 index2 = 0; index2 < partitionBoundaries_->getCountOfPartitions(); index2++)
    {
      const char * binaryVal = partitionBoundaries_->getBinaryBoundaryValue(index2);

      if (binaryVal)
        {
          if (index2 > 0)
            result += ", ";
          result += "b(0x";
          for (int b=0; b<encodedBoundaryKeyLength; b++)
            {
              snprintf(hexDigits, sizeof(hexDigits), "%02hhx", binaryVal[b]);
              result += hexDigits;
            }
          result += ")";
        }
    }
    result += ")";
    end of code for binary keys */
  }

  return result;
}

void RangePartitioningFunction::print(FILE* ofd, const char* indent,
					const char* title) const
{
  PartitioningFunction::print(ofd, indent, "RangePartitioningFunction");

  if (orderOfKeyValues_.entries() > 0)
    {
      fprintf(ofd,"Ascending/descending order :\n");
      for (CollIndex index = 0; index < orderOfKeyValues_.entries(); index++)
	{
	  if (orderOfKeyValues_[index].getItemExpr()->getOperatorType()
	                    == ITM_INVERSE)
	    fprintf(ofd,"D ");
	  else
	    fprintf(ofd,"A ");
	  if (((index/9)* 9 == index) OR (index == orderOfKeyValues_.entries()-1))
	    fprintf(ofd,"\n");
	}
    }

  partitionBoundaries_->print(ofd, indent, title);

} // RangePartitioningFunction::print()


NABoolean 
compareEncodedKey(const char* low, const char* key, const char* high, Int32 keyLen, NABoolean checkLast)
{
    Int32 cmpLow = memcmp(low, key, keyLen);
    Int32 cmpHigh = memcmp(key, high, keyLen);

    if ( cmpLow <= 0 && cmpHigh < 0 )
       return TRUE;

    return (checkLast && cmpLow <= 0 && cmpHigh <= 0);
}

NABoolean 
compareAsciiKey(const char* low, const char* key, const char* high, Int32, NABoolean checkLast)
{
    Int32 cmpLow = strverscmp(low, key);
    Int32 cmpHigh = strverscmp(key, high);

    if ( cmpLow <= 0 && cmpHigh < 0 )
       return TRUE;

    return (checkLast && cmpLow <= 0 && cmpHigh <= 0);
}


// find a boundary pair [low, high) with smallest low value in which keys fall, and return the
// index of the boundary low. Return -1 otherwise, or the key lengths are different.
Int32 RangePartitionBoundaries::findBeginBoundary(char* encodedKey, Int32 keyLen, 
                                                  compFuncPtrT compFunc) const
{
   // boundaries are stored in entries in the range [0, partitionCount_] 
   for (Lng32 i=0; i<=partitionCount_-1; i++ ) {

       const char* low = getBinaryBoundaryValue(i);
       const char* high = getBinaryBoundaryValue(i+1);

       // test if encodedKey is in [low, high)
       if ( (*compFunc)(low, encodedKey, high, keyLen, i==partitionCount_-1) )
          return i;
   }

   return -1;
}

// find a boundary pair [low, high) with the largest low value in which keys fall, and return the
// index of the boundary low. Return -1 otherwise, or the key lengths are different.
Int32 RangePartitionBoundaries::findEndBoundary(char* encodedKey, Int32 keyLen, 
                                                compFuncPtrT compFunc) const
{
   // boundaries are stored in entries in the range [0, partitionCount_] 
   for (Lng32 i=partitionCount_-1; i>= 0; i--) {

       const char* low = getBinaryBoundaryValue(i);
       const char* high = getBinaryBoundaryValue(i+1);

       // test if encodedKey is in [low, high)
       if ( (*compFunc)(low, encodedKey, high, keyLen, i==partitionCount_-1) )
          return i;
   }

   return -1;
}

Int32 
RangePartitioningFunction::computeNumOfActivePartitions(SearchKey* skey, const TableDesc* tDesc) const
{
   const RangePartitionBoundaries* boundaries = getRangePartitionBoundaries();

   Int32 origPartitions = getCountOfPartitions();
   Int32 partitions = origPartitions;
   Int32 bIndex = 0;

   const NATable* naTable = tDesc->getNATable();

   if ( naTable->isHiveTable() )
     return origPartitions;  

   NABoolean isNativeHbase = (naTable->isHbaseCellTable() || naTable->isHbaseRowTable());
   compFuncPtrT compFuncPtr = ( isNativeHbase ) ? compareAsciiKey: compareEncodedKey;
  
   char* buf = NULL;
   Int32 len = 0;

   const ValueIdList& beginKey = skey->getBeginKeyValues();

   if ( beginKey.computeEncodedKey(tDesc, FALSE, buf, len) ) {

      bIndex = boundaries->findBeginBoundary(buf, len, compFuncPtr);

      if ( bIndex < 0 ) 
        return origPartitions; // error in deciding the partiton
      else
        partitions -= bIndex; // bIndex is 0 based.
   }

   const ValueIdList& endKey = skey->getEndKeyValues();

   if ( endKey.computeEncodedKey(tDesc, TRUE, buf, len) ) {

      Int32 eIndex = boundaries->findEndBoundary(buf, len, compFuncPtr);

      if ( eIndex < 0 )  // error in deciding the partition.
        return origPartitions;

      if ( eIndex >= bIndex )  //eIndex is also 0 based
        partitions -= (getCountOfPartitions() - eIndex - 1);
      else 
        return origPartitions;  // end partition is preceeding the start partition!
   }
   
   return partitions;
}



// ***********************************************************************
// LogPhysPartitioningFunction
// ***********************************************************************

LogPhysPartitioningFunction::LogPhysPartitioningFunction(
     PartitioningFunction * logPartFunc,
     PartitioningFunction * physPartFunc,
     logPartType            logPartType,
     Lng32                   numOfClients,
     NABoolean              usePapa,
     NABoolean              synchronousAccess,
     NAMemory* heap)
     : PartitioningFunction(LOGPHYS_PARTITIONING_FUNCTION,heap),
       logPartFunc_(logPartFunc),physPartFunc_(physPartFunc),
       realPartFunc_(NULL),logPartType_(logPartType),
       numOfClients_(numOfClients),usePapa_(usePapa),
       synchronousAccess_(synchronousAccess)
{
  ValueIdSet partKey(logPartFunc->getPartitioningKey());

  partKey += physPartFunc->getPartitioningKey();
  setPartKey(partKey);
}

LogPhysPartitioningFunction::~LogPhysPartitioningFunction()
{}

const LogPhysPartitioningFunction *
LogPhysPartitioningFunction::castToLogPhysPartitioningFunction() const
{
  return this;
}

Lng32 LogPhysPartitioningFunction::getCountOfPartitions() const
{
    return physPartFunc_->getCountOfPartitions();
}

PartitioningRequirement*
LogPhysPartitioningFunction::makePartitioningRequirement()
{
  CMPASSERT(physPartFunc_);
  return physPartFunc_ -> makePartitioningRequirement();
}

// -----------------------------------------------------------------------
// LogPhysPartitioningFunction::getNodeMap()
// Return appropriate node map from underlying partitioning function.
// -----------------------------------------------------------------------
const NodeMap*
LogPhysPartitioningFunction::getNodeMap() const
{

  // --------------------------------------------------------------------------
  //  If no "real" partitioning function exists, extract node map from
  // underlying physical partitioning function; otherwise extract node map from
  // "real" partitioning function.
  // --------------------------------------------------------------------------
  if (realPartFunc_ == NULL)
    {
      return physPartFunc_->getNodeMap();
    }
  else
    {
      return realPartFunc_->getNodeMap();
    }

} // PartitioningFunction::getNodeMap()

// get any existing (logical or physical) nodemap (or synthesize one) that 
// matches logPartFunc_'s partition count requirement
NodeMap* LogPhysPartitioningFunction::getOrMakeSuitableNodeMap
(NABoolean forESP) const
{
  // we are the LogPhysPartitioningFunction of the synthesized physical 
  // property of an Exchange's child (that executes in DP2).

  // if logical partn func's nodemap entry count == log partn func partn count
  // then return logical partn func's nodemap
  NodeMap *nodemap = (NodeMap*)logPartFunc_->getNodeMap();
  ULng32 partCnt=(ULng32)logPartFunc_->getCountOfPartitions();
  if (nodemap && nodemap->getNumEntries() == partCnt) {
    // the logical partitioning of the result already has a suitable nodemap.
    // fall thru and use it.
  }
  else {
    // logical partitioning function has no nodemap. try to find one in the
    // physical partitioning function of the child.

    // if phys partn func's nodemap entry count == log partn func partn count
    // then return phys partn func's nodemap.
    nodemap = (NodeMap*)physPartFunc_->getNodeMap();
    if (nodemap && nodemap->getNumEntries() == partCnt) {
      // this can happen in "select * from t035t6;" and other queries against
      // vertically partitioned tables like t035t6 (see regress/core/test035).
      // fall thru and use it.
    }
    else {
      // only as a last resort do we synthesize a new nodemap
      nodemap = nodemap->synthesizeLogicalMap(partCnt, forESP);
    }
  }
  if (logPartFunc_->castToReplicateNoBroadcastPartitioningFunction()) {
    for(CollIndex i = 0; i < partCnt; i++) {
      nodemap->setPartitionState(i, NodeMapEntry::ACTIVE);
    }
  }
  return nodemap;
}

PartitioningFunction* LogPhysPartitioningFunction::copy() const
{
  return new(CmpCommon::statementHeap()) LogPhysPartitioningFunction(*this);
}

NABoolean
LogPhysPartitioningFunction::canProducePartitioningKeyPredicates() const
{
  // this is an exception to the rule: for a LogPhysPartitioningFunction
  // we assume that there is no need to enforce the physical partitioning
  // key predicates, since each DP2 partition has only its own data
  // anyway. Therefore, for the purpose of evaluating partitioning key
  // predicates and partition input values, we deal with the logical
  // partitioning function only, and only in the case where the PA
  // node isn't doing the grouping.

  // we can do this if we can produce the "real" partitioning function
  // and if the real one can produce part key preds
  return (logPartFunc_->canProducePartitioningKeyPredicates() OR
	  logPartType_ == PA_PARTITION_GROUPING OR
	  logPartType_ == PA_GROUPED_REPARTITIONING);
}

void LogPhysPartitioningFunction::createPartitioningKeyPredicates()
{
  CMPASSERT(logPartFunc_->canProducePartitioningKeyPredicates());

  // see comment in canProducePartitioningKeyPredicates() above
  storePartitioningKeyPredicates(logPartFunc_->getPartitioningKeyPredicates());
  storePartitionInputValues(logPartFunc_->getPartitionInputValuesLayout());
}

// -----------------------------------------------------------------------
// LogPhysPartitioningFunction::replacePivs()
// -----------------------------------------------------------------------
void LogPhysPartitioningFunction::replacePivs(
       const ValueIdList& newPivs,
       const ValueIdSet& newPartKeyPreds)
{
  // Overwrite the old pivs, part key preds, and part expr. with the new ones.
  storePartitionInputValues(newPivs);
  storePartitioningKeyPredicates(newPartKeyPreds);
  // Also overwrite them in the logical partitioning function.
  logPartFunc_->replacePivs(newPivs,newPartKeyPreds);
} // LogPhysPartitioningFunction::replacePivs()

ItemExpr* LogPhysPartitioningFunction::createPartitioningExpression()
{
  CMPASSERT(0); // should never reach here, it's not legal to do a
                // repartitioning for a logphys part func
  return NULL;
}

PartitioningFunction*
LogPhysPartitioningFunction::createRealPartitioningFunction()
{
  if (realPartFunc_)
    return realPartFunc_;

  switch (logPartType_)
    {
    case PA_PARTITION_GROUPING:
    case PA_GROUPED_REPARTITIONING:
      // for partition grouping or repartitioning we never split any
      // physical partitions and we are processing only physical
      // partitions at this point (logical partitions come into the
      // picture once we reach the DP2 exchange)
      realPartFunc_ = physPartFunc_;
      break;

    case LOGICAL_SUBPARTITIONING:
      {
	// Merge the split ranges of logical and physical partitioning
	// functions, so we can count the exact number of partitions.

        const RangePartitioningFunction *lpf =
	  logPartFunc_->castToRangePartitioningFunction();
	const RangePartitioningFunction *ppf =
	  physPartFunc_->castToRangePartitioningFunction();

	// Log and phys partitioning functions must have the same key
	// to be able to produce a common function
	if (lpf AND ppf AND
	    lpf->getOrderOfKeyValues() == ppf->getOrderOfKeyValues())
	  {

	    // Make a new RangePartitioningFunction object with the combined
	    // split boundaries of the two.  Also produce a corresponding
            // node map for the new boundaries.
            NodeMap *mergedMap = new (CmpCommon::statementHeap())
                                           NodeMap(CmpCommon::statementHeap());
	    RangePartitionBoundaries *rb =
	      ppf->getRangePartitionBoundaries()->merge(
		   *lpf->getRangePartitionBoundaries(),
                   *ppf->getNodeMap(),
                   *mergedMap);

	    // return the merged partitioning function if we could merge
	    if (rb)
	      realPartFunc_ = new(CmpCommon::statementHeap())
		RangePartitioningFunction(
		     ppf->getPartitioningKey(),
		     ppf->getKeyColumnList(),
		     ppf->getOrderOfKeyValues(),
		     rb,
                     mergedMap );
	  }
        else
          {
            // Special case of single partition physical partitioning function.
            if (physPartFunc_->castToSinglePartitionPartitioningFunction())
              {

                realPartFunc_ = logPartFunc_;

                //  Create a new node map.
                NodeMap *nodeMap = new(CmpCommon::statementHeap())
                                           NodeMap(CmpCommon::statementHeap());

                //  Add a copy of the single physical node map entry for
                // each partition in the logical node map.
                const NodeMapEntry *entry = physPartFunc_->getNodeMapEntry(0);
                for (CollIndex nodeIdx = 0;
                     nodeIdx < (CollIndex) logPartFunc_->getCountOfPartitions();
                     nodeIdx++)
                  {
                     nodeMap->setNodeMapEntry(nodeIdx,
                                              *entry,
                                              CmpCommon::statementHeap());
                  }
                realPartFunc_->replaceNodeMap(nodeMap);

              }
          }
      }
      break;

    default:
      break;
    }

  return realPartFunc_; // is NULL if we aren't smart enough to do this
}

COMPARE_RESULT
LogPhysPartitioningFunction::comparePartFuncToFunc
                               (const PartitioningFunction &other) const
{
  COMPARE_RESULT c = PartitioningFunction::comparePartFuncToFunc(other);

  if (c != SAME)
    return INCOMPATIBLE;

  const LogPhysPartitioningFunction &lppfOther =
    (const LogPhysPartitioningFunction &) other;

  if (getLogPartType() != lppfOther.getLogPartType())
    return INCOMPATIBLE;

  if (getNumOfClients() != lppfOther.getNumOfClients())
    return INCOMPATIBLE;

  if (getUsePapa() != lppfOther.getUsePapa())
    return INCOMPATIBLE;

  if (getSynchronousAccess() != lppfOther.getSynchronousAccess())
    return INCOMPATIBLE;

  if (getLogPartitioningFunction()->comparePartFuncToFunc(
       *lppfOther.getLogPartitioningFunction()) != SAME)
    return INCOMPATIBLE;

  if (getPhysPartitioningFunction()->comparePartFuncToFunc(
       *lppfOther.getPhysPartitioningFunction()) != SAME)
    return INCOMPATIBLE;

  return SAME;
}

NABoolean LogPhysPartitioningFunction::isAGroupingOf(
     const PartitioningFunction &other,
     Lng32* maxPartsPerGroup) const
{
  if (maxPartsPerGroup != NULL)
    *maxPartsPerGroup = 1;

  // Handle the trivial case of two functions that are equal. The only
  // other way to check for a grouping is to actually form the "real"
  // partitioning function and use it for the check.
  return (comparePartFuncToFunc(other) == SAME OR
	  (realPartFunc_ AND
	   realPartFunc_->isAGroupingOf(other,maxPartsPerGroup)));
}

// -----------------------------------------------------------------------
// LogPhysPartitioningFunction::remapIt()
// -----------------------------------------------------------------------
void LogPhysPartitioningFunction::remapIt
                                  (const PartitioningFunction* opf,
			           ValueIdMap& map, NABoolean mapItUp)
{
  // Currently, the only consumer of this method is
  // MapValueIds::synthPhysicalProperty, when it executes in DP2

  // Map myself
  PartitioningFunction::remapIt(opf,map,mapItUp);
  // Map the logical function
  logPartFunc_ = logPartFunc_->copyAndRemap(map,mapItUp);
  // Map the physical function
  physPartFunc_ = physPartFunc_->copyAndRemap(map,mapItUp);
} // LogPhysPartitioningFunction::remapIt()

// LogPhysPartitioningFunction::canMaintainSortOrder()
// Can this logPhys partitioning function maintain the order of an
// individual partition of the physical partitioning function.  In
// order to maintain the order, a merge expression may be required.
//
NABoolean
LogPhysPartitioningFunction::canMaintainSortOrder(const ValueIdList& sortOrder)
const
{
  // The sortOrder is the sort order of an individual partition of the
  // physical partitioning function.
  //

  
  CollIndex numSortKeys = sortOrder.entries();

  // We can always maintain, no sort order.
  //
  if(numSortKeys == 0) {
    return TRUE;
  }

  // For now, only concerned with table/index partitioning functions.
  // (Single, Range or TableHash)
  //
  const SinglePartitionPartitioningFunction *sppf = 
    physPartFunc_->castToSinglePartitionPartitioningFunction();

  // A single partition can always maintain its order.
  //
  if(sppf) {
    return TRUE;
  }

  const RangePartitioningFunction *rpf = 
    physPartFunc_->castToRangePartitioningFunction();

  const TableHashPartitioningFunction *hdpf = 
    physPartFunc_->castToTableHashPartitioningFunction();

  CMPASSERT(rpf || hdpf);

  if(!getSynchronousAccess()) {
    // Can always merge the streams, even if (in the case of
    // non-decoupled range) this results in serial access to the
    // partitions.
    //
    return TRUE;
  }

  if(rpf) {

    // For range partitioning, if the partitioning keys are a prefix
    // of the clustering key, then in order to maintain the sort order
    // the partitions must be accessed synchronously.
    //
    // If the partitioning keys are not a prefix of the clustering
    // key, then the partitions can be merged if the partitions are
    // accessed asynchronously. (This case is handled above)
    //

    // Determine if the sortkey is a prefix of the partitioning key or
    // if the partitioning key is a prefix of the sortkey.  If either
    // of these is true, the sort order can be maintained if the
    // partitions are accessed synchronously.
    //

    const ValueIdList &partKeyCols = rpf->getOrderOfKeyValues();

    // This logic may have to change when we become more sophisticated
    // in our sort keys and sort requirements.  For example, if the
    // sort req is (a/2) and the sort key is a
    //
    if (partKeyCols.entries() <= numSortKeys) {
      
      OrderComparison oc= sortOrder.satisfiesReqdOrder(partKeyCols); 
      if(oc == SAME_ORDER ||
         (CmpCommon::getDefault(ATTEMPT_REVERSE_SYNCHRONOUS_ORDER) == DF_ON &&
          oc == INVERSE_ORDER)
        )

        {
        // Range, non-decoupled, sort order == partition order,
        // synchronous access: therefore can maintain order.
        //
        return TRUE;
        }
        else {
        // Not a prefix, so in order to maintain order, must be
        // accessing the partitions asynchronously so that a merge
        // expression can be used.
        //
        return FALSE;
        }
    } else {
      if(partKeyCols.satisfiesReqdOrder(sortOrder) == SAME_ORDER) {
        // Range, non-decoupled, sort order == partition order,
        // synchronous access: therefore can maintain order.
        //
        return TRUE;
      } else {
        // Not a prefix, so in order to maintain order, must be
        // accessing the partitions asynchronously so that a merge
        // expression can be used.
        //
        return FALSE;
      }
    }
  } else {

    // For Hash, synchronous (and all other partitioning functions)
    // cannot maintain order.
    //
    return FALSE;
  }
}  // LogPhysPartitioningFunction::canMaintainSortOrder()


const NAString LogPhysPartitioningFunction::getText() const
{
  NAString result("logphys partitioned(", CmpCommon::statementHeap());

  switch (logPartType_)
    {
    case PA_PARTITION_GROUPING:
      result += "grouping";
      break;
    case LOGICAL_SUBPARTITIONING:
      result += "subpartitioning";
      break;
    case HORIZONTAL_PARTITION_SLICING:
      result += "horizontal slicing";
      break;
    case PA_GROUPED_REPARTITIONING:
      result += "repartitioning";
      break;
    default:
      result += "???";
      break;
    }
  if (usePapa_)
    {
      char numPAs[20];

      result += ", PAPA with ";
      sprintf(numPAs,"%d",numOfClients_);
      result += numPAs;
      result += " PA(s)";
    }

  result += ", log="  + logPartFunc_->getText() +
            ", phys=" + physPartFunc_->getText() + ")";
  return result;
}

const NAString LogPhysPartitioningFunction::getLogForSplitTop() const
{
  NAString result("", CmpCommon::statementHeap());

  switch (logPartType_)
    {
    case PA_PARTITION_GROUPING:
      //whenever the logical partitioning function is replicate no broadcast
      //it is not truly the case of grouping
      if (logPartFunc_->isAReplicateNoBroadcastPartitioningFunction())
	result += "no grouping, ";
      else {
	char numParts[20];
	result += "grouped ";
	sprintf(numParts, "%d to %d,", logPartFunc_->getCountOfPartitions(), physPartFunc_->getCountOfPartitions());
	result += numParts;
      }
      break;
    case LOGICAL_SUBPARTITIONING:
      result += "subpartitioned, ";
      break;
    case HORIZONTAL_PARTITION_SLICING:
      result += "horizontally sliced, ";
      break;
    case PA_GROUPED_REPARTITIONING:
      result += "repartitioned, ";
      break;
    default:
      result += "???, ";
      break;
    }
  if (usePapa_)
    {
      char numPAs[20];

      result += "PAPA with ";
      sprintf(numPAs,"%d",numOfClients_);
      result += numPAs;
      result += " PA(s)";
    }
  result += ", "  + logPartFunc_->getText();

  return result;
}

const NAString LogPhysPartitioningFunction::getPhysForSplitTop() const
{
  return physPartFunc_->getText();
}

void LogPhysPartitioningFunction::print(
     FILE* ofd,
     const char* indent,
     const char* title) const
{
  PartitioningFunction::print(ofd,indent,"LogPhysPartitioningFunction");
  logPartFunc_->print(ofd,"logical:  ");
  physPartFunc_->print(ofd,"physical: ");
}

// ***********************************************************************
// RoundRobinPartitioningFunction
// ***********************************************************************

PartitioningRequirement*
RoundRobinPartitioningFunction::makePartitioningRequirement()
{
  return new (CmpCommon::statementHeap())
             RequireRoundRobin(this);
}

PartitioningFunction*
RoundRobinPartitioningFunction::copy() const
{
  return new(CmpCommon::statementHeap()) RoundRobinPartitioningFunction(*this);
}

// -----------------------------------------------------------------------
// RoundRobinPartitioningFunction::comparePartFuncToFunc(): Compare
// this partitioning function to another round robin function. To be
// 'SAME' must have the same number and order of partitioning key
// columns and have the same number of partitions (scaled and
// original).
// -----------------------------------------------------------------------
COMPARE_RESULT
RoundRobinPartitioningFunction::
comparePartFuncToFunc(const PartitioningFunction &other) const
{
  COMPARE_RESULT c = PartitioningFunction::comparePartFuncToFunc(other);

  if (c != SAME)
    return INCOMPATIBLE;

  const RoundRobinPartitioningFunction *oth =
    other.castToRoundRobinPartitioningFunction();

  // Since they compared 'SAME', oth should always exist, so this
  // test is redundant.
  //
  if(!oth)
    return INCOMPATIBLE;

  // They must be based on the same physical partitioning.
  //
  if (getCountOfOrigRRPartitions() != oth->getCountOfOrigRRPartitions())
    return INCOMPATIBLE;

  return SAME;
} // RoundRobinPartitioningFunction::comparePartFuncToFunc()

// -----------------------------------------------------------------------
// RoundRobinPartitioningFunction::createPartitioningKeyPredicates()
// -----------------------------------------------------------------------
void RoundRobinPartitioningFunction::createPartitioningKeyPredicates()
{
  if (NOT partKeyPredsCreated())
    {
      // Create the partition input values.
      //
      ItemExpr *loPart = new (CmpCommon::statementHeap())
        HostVar("_sys_HostVarLoRoundRobinPart",
                new (CmpCommon::statementHeap()) SQLInt(CmpCommon::statementHeap(), FALSE, FALSE),
                TRUE);

      ItemExpr *hiPart = new (CmpCommon::statementHeap())
        HostVar("_sys_HostVarHiRoundRobinPart",
                new (CmpCommon::statementHeap()) SQLInt(CmpCommon::statementHeap(), FALSE, FALSE),
                TRUE);

      loPart->synthTypeAndValueId();
      hiPart->synthTypeAndValueId();

      ValueIdSet setOfKeyPredicates;
      ValueIdList partInputValues;

      // -----------------------------------------------------------------
      // The partitioning key predicate is never used for a round
      // robin partitioning so none is generated.
      // -----------------------------------------------------------------

      // the partition input values are two integer values: lo and hi part #
      partInputValues.insert(loPart->getValueId());
      partInputValues.insert(hiPart->getValueId());

      // Store the empty set of key predicate.  This will set the
      // boolean 'partKeyPredsCreated' to TRUE.
      //
      storePartitioningKeyPredicates(setOfKeyPredicates);

      // Store the partition input values.
      //
      storePartitionInputValues(partInputValues);
    }
} // RoundRobinPartitioningFunction::createPartitioningKeyPredicates()

// -----------------------------------------------------------------------
// RoundRobinPartitioningFunction::replacePivs()
// -----------------------------------------------------------------------
void RoundRobinPartitioningFunction::replacePivs(
       const ValueIdList& newPivs,
       const ValueIdSet& newPartKeyPreds)
{
  // Overwrite the old pivs, part key preds, and part expr. with the new ones.
  storePartitionInputValues(newPivs);
  storePartitioningKeyPredicates(newPartKeyPreds);
} // RoundRobinPartitioningFunction::replacePivs()

// -----------------------------------------------------------------------
// RoundRobinPartitioningFunction::createPartitioningFunctionForIndexDesc()
// -----------------------------------------------------------------------
PartitioningFunction*
RoundRobinPartitioningFunction::
createPartitioningFunctionForIndexDesc(IndexDesc *idesc) const
{
  const NAFileSet * fileSet = idesc->getNAFileSet();
  const NAColumnArray & allColumns = fileSet->getAllColumns();
  const NAColumnArray & partKeyColumns = fileSet->getPartitioningKeyColumns();

  Lng32 ixColNumber;
  ValueId keyValueId;
  ValueIdSet partitioningKey;

  for (CollIndex i = 0; i < partKeyColumns.entries(); i++)
    {
      // which column of the index is this (usually this will be == i)
      ixColNumber = allColumns.index(partKeyColumns[i]);

      // insert the value id of the index column into the partitioning
      // key column value id set
      keyValueId = idesc->getIndexColumns()[ixColNumber];
      partitioningKey += keyValueId;
    } // end loop over partitioning key columns

  // -----------------------------------------------------------------
  // Allocate a new RoundRobinPartitioningFunction.
  // -----------------------------------------------------------------
  RoundRobinPartitioningFunction *partFunc
             = new(idesc->wHeap()) RoundRobinPartitioningFunction
                                   (getCountOfPartitions(),
                                    partitioningKey,
                                    getNodeMap()->copy(idesc->wHeap()));
  partFunc->createPartitioningKeyPredicates();

  return partFunc;

} // RoundRobinPartitioningFunction::createPartitioningFunctionForIndexDesc()

// -----------------------------------------------------------------------
// RoundRobinPartitioningFunction::createPartitioningExpression()
// -----------------------------------------------------------------------
ItemExpr* RoundRobinPartitioningFunction::createPartitioningExpression()
{
  if (getExpression())      // already constructed?
    return getExpression(); // reuse it!

  CollHeap *heap = CmpCommon::statementHeap();

  // The Partitioning Key (SYSKEY)
  //
  ValueIdList keyValue = getPartitioningKey();

  // The type of the partitioning key for round robin is always
  // SQLLargeInt (the type of SYSKEY)
  //
  NAType *desiredType = new (heap) SQLLargeInt(heap, TRUE, FALSE);

  // The layout of the SYSKEY is
  //
  ItemExpr *partKey =
    new (heap)
    Cast(new (heap)
         Shift(ITM_SHIFT_RIGHT,
               new (heap)
               Cast(keyValue[0].getItemExpr(),
                    desiredType),
               new (heap)
               ConstValue(32)),
         new (heap) SQLInt(heap, FALSE,FALSE));

  NAType *numPartsType = new (heap) SQLInt(heap, FALSE,FALSE);

  Lng32 numParts = getCountOfOrigRRPartitions();
  char buffer[20];
  sprintf(buffer, "%d", numParts);

  NAString numPartsStr = buffer;
  ItemExpr *origNumParts =
    new (heap) ConstValue(numPartsType, &numParts, sizeof(Lng32), &numPartsStr);

  numParts = getCountOfPartitions();
  sprintf(buffer, "%d", numParts);

  numPartsStr = buffer;
  ItemExpr *scaledNumParts =
    new (heap) ConstValue(numPartsType, &numParts, sizeof(Lng32), &numPartsStr);

  ItemExpr *partFunc =
    new (heap)
    PAGroup(new (heap)
            ProgDistrib(partKey, origNumParts),
            scaledNumParts,
            origNumParts);

  partFunc->synthTypeAndValueId();
  storeExpression(partFunc);
  return partFunc;
} // RoundRobinPartitioningFunction::createPartitioningExpression()

// -----------------------------------------------------------------------
// RoundRobinPartitioningFunction::createPartitionSelectionExpr()
// Create the partition selection expression.  'Partition selection'
// means that an expression is used to determine the partition to
// access as opposed to using the File System to determine the range
// of partitions to access based on a set of partitioning key
// predicates. Partition selection is currently used for Hash Dist and
// Round Robin Partitioning. And the File System is used for Range
// Partitioning.  If a partitioning selection expression is created,
// it is cached in the data member 'partitionSelectionExpr_' and the
// partition selection inputs are generated and stored in
// 'partitionSelectionExprInputs_'. This method is redefined for
// HashDistPartitioningFunction and RoundRobinPartitioningFunction.
// -----------------------------------------------------------------------
ItemExpr *
RoundRobinPartitioningFunction::
createPartitionSelectionExpr(const SearchKey *partSearchKey,
                             const ValueIdSet &availableValues)
{
  // If it has already been created, return cached version.
  //
  if(partitionSelectionExpr())
    return partitionSelectionExpr();

  CollHeap *heap = CmpCommon::statementHeap();

  // Use a host var to provide access to numParts, this will be
  // mapped to a specific ATP:ATPIndex:offset in PartitionAccess::codeGen()
  //
  ItemExpr *numParts = new (heap) HostVar("_sys_hostVarNumParts",
                                          // int not null
                                          new (heap) SQLInt(heap, FALSE, FALSE),
                                          // is system-supplied
                                          TRUE);
  numParts->synthTypeAndValueId();

  // Use a host var to provide access to partNum, this will be
  // mapped to a specific ATP:ATPIndex:offset in PartitionAccess::codeGen()
  //
  ItemExpr *partNum = new (heap) HostVar("_sys_hostVarPartNo",
                                         // int not null
                                         new (heap) SQLInt(heap, FALSE, FALSE),
                                         // is system-supplied
                                         TRUE);
  partNum->synthTypeAndValueId();

  // Record these hostvars as the inputs to the partitionSelectionExpr.
  //
  CMPASSERT(partitionSelectionExprInputs().entries() == 0);
  partitionSelectionExprInputs().insert(partNum->getValueId());
  partitionSelectionExprInputs().insert(numParts->getValueId());

  if(assignPartition()) {
    // If partitionAssignment is to be done, the partition selection
    // expression is Modulus(Cast((partNum+1),SQLInt),numParts).  The
    // cast (to SQLInt) is there because the type synthesized for
    // partNum+1 is a large int, which Modulus can't handle at the
    // moment.  So, we just cast the sum to a SQLInt for now.
    // Probably the best thing to do here is to somehow force the type
    // synthesis for BiArith(ITM_PLUS) so that when we add two SQLInts
    // we get a result of SQLInt, rather than something bigger.  This
    // doesn't follow SQL type synthesis rules, but that is OK, since
    // the expression is generated internally. $$ TBD $$


    // Construct and cache the partitionSelectionExpr for Round Robin
    // (partition assignment).
    //
    partitionSelectionExpr() =
      new (heap) Modulus(new (heap) Cast(new (heap)
                                         BiArith(ITM_PLUS, partNum,
                                                 new (heap) SystemLiteral(1)),
                                         new (heap) SQLInt(heap, TRUE,FALSE)),
                         numParts);

    // Bind the expression.
    //
    partitionSelectionExpr()->synthTypeAndValueId();

    // PreCodeGen the expression (This maybe should go in
    // RoundRobinPartitioningFunction::preCodeGen(), but preCodeGen()
    // is typically called before this expression is generated.
    //
    partitionSelectionExpr()->replaceVEGExpressions(availableValues,
                                                    availableValues);

    return partitionSelectionExpr();

  } else if(partSearchKey->isUnique()) {

    //  For now, only support a partition selection expression when the
    // partition search key is unique (identifies exactly one partition).
    //

    const ValueIdList &keyValues =  partSearchKey->getBeginKeyValues();

    // The type of the partitioning key for round robin is always
    // SQLLargeInt (the type of SYSKEY)
    //
    NAType *desiredType = new (heap) SQLLargeInt(heap,TRUE, FALSE);

    // The partition selection expression is:
    //
    // ProgDistrib(Cast(ShiftRight(Cast(<keyvalue>, SQLLargeInt), 32), SQLInt),
    //             numParts)
    //
    // First, the <keyvalue> is cast to a SQLLargeInt to make it the
    // same type as the partitioning key (SYSKEY).  This value is then
    // shifted right 32 bits to remove the 16 bits of padding
    // introduced for packing and 16 bits of random data used to make
    // sure the key is unique, but not used to compute the partition
    // number.  This is then cast to a SQLInt to make it compatible
    // with ProgDistrib.  Then ProgDistrib calculates the partNum.

    // Construct and cache the partitionSelectionExpr for Round Robin
    // (partition assignment).
    //
    partitionSelectionExpr() =
      new (heap) ProgDistrib(new (heap)
                             Cast(new (heap)
                                  Shift(ITM_SHIFT_RIGHT,
                                        new (heap)
                                        Cast(keyValues[0].getItemExpr(),
                                             desiredType),
                                        new (heap)
                                        SystemLiteral(0x1000)),
                                  new (heap) SQLInt(heap, FALSE,FALSE)),
                             numParts);

    // Bind the expression.
    //
    partitionSelectionExpr()->synthTypeAndValueId();

    // PreCodeGen the expression (This maybe should go in
    // RoundRobinPartitioningFunction::preCodeGen(), but preCodeGen()
    // is typically called before this expression is generated.
    //
    partitionSelectionExpr()->replaceVEGExpressions(availableValues,
                                                    availableValues);

    return partitionSelectionExpr();

  } else {
    return NULL;
  }

} // RoundRobinPartitioningFunction::createPartitionSelectionExpr()

//::scaleNUmberOfPartitions() are called in following locations
//
//  OptPhysRelExpr.cpp
//      3536 NJ::genLeftChild()
//      4892 NJ:createContextForChild() (rightPartFunc->)
//      14166 and 14262 synthDP2PhysicalProperty()
//
//  GP.cpp
//      955 GroupAttributes::recommendedOrderForNJProbing()
//
//As a result, the following version will not be called.

PartitioningFunction *
RoundRobinPartitioningFunction::
scaleNumberOfPartitions(Lng32 &suggestedNewNumberOfPartitions,
                        PartitionGroupingDistEnum partGroupDist)
{
  if (suggestedNewNumberOfPartitions == 1)
    return new(CmpCommon::statementHeap())
      SinglePartitionPartitioningFunction();

  // If an expression has been generated, then we want to discard it
  // because it may no longer be correct.
  storeExpression(NULL);

  // Allow arbitrary scaling of RoundRobinPartitioningFunctions.
  // (The runtime will handle the mapping of physical partitions
  // to logical partitions.)
  //


  if ( suggestedNewNumberOfPartitions != getCountOfPartitions() ) {
     NodeMap* newNodeMap =
       new (heap_) NodeMap(heap_, suggestedNewNumberOfPartitions);
     replaceNodeMap(newNodeMap);
  }

  partitionCount_ = suggestedNewNumberOfPartitions;
  return this;
} // RoundRobinPartitioningFunction::scaleNumberOfPartitions()

// -----------------------------------------------------------------------
// RoundRobinPartitioningFunction::isAGroupingOf()
// -----------------------------------------------------------------------
NABoolean
RoundRobinPartitioningFunction::
isAGroupingOf(const PartitioningFunction &other,
              Lng32* maxPartsPerGroup) const
{
  if (maxPartsPerGroup != NULL)
    *maxPartsPerGroup = 1;

  const RoundRobinPartitioningFunction *oth =
    other.castToRoundRobinPartitioningFunction();

  // If other is not a RoundRobinPartitioningFunction, then it cannot
  // be a grouping of...
  //
  if(!oth)
    return FALSE;

  // To be a grouping of, the key column (SYSKEY) must be the same.
  //
  if (! (getPartitioningKey() == oth->getPartitioningKey()))
    return FALSE;


  // If this function has more partitions than other,
  // then it cannot be a grouping of.
  // Eg.  this.numParts: 10 this.origNumParts: 20
  //       oth.numParts: 5   oth.origNumParts: 20
  //
  // If the two functions are not based on the same physical function,
  // then it cannot be a grouping of.
  // Eg.  this.numParts: 10 this.origNumParts: 20
  //       oth.numParts: 10  oth.origNumParts: 30
  //
  if((getCountOfPartitions() > oth->getCountOfPartitions()) ||
     (getCountOfOrigRRPartitions() != oth->getCountOfOrigRRPartitions())) {
    return FALSE;
  }

  // Here the following is known to be TRUE:
  //
  //  (getCountOfPartitions() <= oth->getCountOfPartitions()
  //
  // AND
  //
  //  (getCountOfOrigRRPartitions() == oth->getCountOfOrigRRPartitions())
  //
  // Eg.  this.numParts: 10 this.origNumParts: 20
  //       oth.numParts: 10  oth.origNumParts: 20
  //
  //      this.numParts:  5 this.origNumParts: 20
  //       oth.numParts: 10  oth.origNumParts: 20

  // If other has not been scaled (allow arbitrary scaling of one function):
  // Eg.  this.numParts:  7 this.origNumParts: 20
  //       oth.numParts: 20  oth.origNumParts: 20
  // OR
  // If they have both been scaled to the same number of partitions:
  // then it is a grouping of.
  // Eg.  this.numParts:  7 this.origNumParts: 20
  //       oth.numParts:  7  oth.origNumParts: 20
  //
  if((oth->getCountOfPartitions() == oth->getCountOfOrigRRPartitions()) ||
     (getCountOfPartitions() == oth->getCountOfPartitions())) {

    if (maxPartsPerGroup != NULL)
      *maxPartsPerGroup =
        ((oth->getCountOfPartitions() + getCountOfPartitions() - 1)
         / getCountOfPartitions());

    return TRUE;
  }

  // WARNING.....  I am not sure if the current code can ever produce
  // a situation that would bring control to here.  Also, I am not
  // sure if the semantics of GROUPING implemented below are correct
  // for these situations.
  //
  // Here the following is known to be TRUE:
  //
  //  both functions have been scaled.  (I DON'T THINK THIS CAN HAPPEN)
  //
  // AND
  //
  //  They are scaled to different sizes.
  //
  // Eg.  this.numParts:  5 this.origNumParts: 20
  //       oth.numParts: 10  oth.origNumParts: 20
  //
  //      this.numParts:  7 this.origNumParts: 20
  //       oth.numParts: 10  oth.origNumParts: 20
  //

  // Under these conditions, three things must be true for it to be a
  // grouping of:
  //
  // - the scaled number of partitions must evenly divide the scaled
  //   number of partitions of other
  // Eg.  this.numParts:  5 this.origNumParts: 20
  //       oth.numParts: 10  oth.origNumParts: 20
  //
  // - the other scaling must be a multiple of or evenly divide the
  //   original number of partitions
  // Eg.  this.numParts:  5 this.origNumParts: 20
  //       oth.numParts: 10  oth.origNumParts: 20
  //
  //      this.numParts:  5 this.origNumParts: 20
  //       oth.numParts: 40  oth.origNumParts: 20
  //
  // - this scaling must also be a multiple of or evenly divide the
  //   original number of partitions
  // Eg.  this.numParts:  5 this.origNumParts: 20
  //       oth.numParts: 10  oth.origNumParts: 20
  //
  //      this.numParts: 40 this.origNumParts: 20
  //       oth.numParts: 10  oth.origNumParts: 20
  //

  // If the scaled number of partitions evenly divides the scaled
  // number of partitions of other...
  //
  if((oth->getCountOfPartitions() % getCountOfPartitions()) != 0) {
    return FALSE;
  }

  // AND the other scaling is a multiple of or evenly divides the
  // original number of partitions...
  //
  if(oth->getCountOfOrigRRPartitions() >= oth->getCountOfPartitions()) {
    if(oth->getCountOfOrigRRPartitions() % oth->getCountOfPartitions()) {
      return FALSE;
    }
  } else {
    if(oth->getCountOfPartitions() % oth->getCountOfOrigRRPartitions()) {
      return FALSE;
    }
  }

  // AND this scaling is a multiple of or evenly divides the original
  // number of partitions ...
  //
  if(getCountOfOrigRRPartitions() >= getCountOfPartitions()) {
    if(getCountOfOrigRRPartitions() % getCountOfPartitions()) {
      return FALSE;
    }
  } else {
    if(getCountOfPartitions() % getCountOfOrigRRPartitions()) {
      return FALSE;
    }
  }

  if (maxPartsPerGroup != NULL)
    *maxPartsPerGroup =
      ((oth->getCountOfPartitions() + getCountOfPartitions() - 1)
       / getCountOfPartitions());

  // THEN it is a grouping of...
  //
  return TRUE;

} // RoundRobinPartitioningFunction::isAGroupingOf()

// -----------------------------------------------------------------------
// RoundRobinPartitioningFunction::codeGen() is defined in
// generator/GenPartFunc.cpp
// -----------------------------------------------------------------------

// -----------------------------------------------------------------------
// Make a new partSearchKey that indicates that PA_PARTITION_GROUPING
// is being done.  Note that a search key can not be generated which
// can group RR partitions.  For RoundRobinPartitioning, a flag in the
// search key is used to indicate that PA_PARTITION_GROUPING is being
// done and the begin/end key values of the search key are set to the
// partition input values of the partitioning function.
// -----------------------------------------------------------------------
SearchKey *
RoundRobinPartitioningFunction::createSearchKey(const IndexDesc *indexDesc,
                                                ValueIdSet availInputs,
                                                ValueIdSet additionalPreds) const
{
  ValueIdSet preds(getPartitioningKeyPredicates());
  ValueIdSet nonKeyColumnSet; // empty set

  availInputs += getPartitionInputValues();
  preds += additionalPreds;

  // Call this special constructor that constructs a search key for a
  // RoundRobinPartitioningFunction.
  //
  SearchKey *partSearchKey = new (CmpCommon::statementHeap())
    SearchKey(indexDesc->getPartitioningKey(),
              indexDesc->getOrderOfPartitioningKeyValues(),
              availInputs,
              preds,
              this,
              nonKeyColumnSet,
              indexDesc);

  return partSearchKey;
} // RoundRobinPartitioningFunction::createSearchKey()

void RoundRobinPartitioningFunction::setupForStatement()
{
  if(setupForStatement_)
    return;

  PartitioningFunction::setupForStatement();

  setupForStatement_ = TRUE;
  resetAfterStatement_ = FALSE;
}

void RoundRobinPartitioningFunction::resetAfterStatement()
{
  if(resetAfterStatement_)
    return;

  PartitioningFunction::resetAfterStatement();
  partitionCount_ = numberOfOrigRRPartitions_;

  setupForStatement_ = FALSE;
  resetAfterStatement_ = TRUE;
}

const NAString RoundRobinPartitioningFunction::getText() const
{
  NAString result("round robin partitioned ", CmpCommon::statementHeap());
  char nparts[20];

  sprintf(nparts,"%d (%d)", partitionCount_, numberOfOrigRRPartitions_);
  result += nparts;
  result += " ways on (";
  getPartitioningKey().unparse(result,DEFAULT_PHASE,EXPLAIN_FORMAT);
  result += ")";
  return result;
}

void RoundRobinPartitioningFunction::print(
     FILE* ofd,
     const char* indent,
     const char* title) const
{
  PartitioningFunction::print(ofd,indent,"RoundRobinPartitioningFunction");
}


const skewProperty ANY_SKEW_PROPERTY(skewProperty::ANY, NULL);
  
NABoolean skewProperty::operator==(const skewProperty& other) const
{
   if ( indicator_  != other.indicator_ )
       return FALSE;

   if (broadcastOneRow_ != other.broadcastOneRow_)
     return FALSE;

   if ( skewValues_ == other.skewValues_ )
     return TRUE;

    if ( skewValues_ != NULL && other.skewValues_ != NULL &&
         NOT (*skewValues_ == *(other.skewValues_)) )
       return FALSE;

   if ( numESPs_ == other.numESPs_ )
     return TRUE;

   return FALSE;
}

const NAString skewProperty::getText(NABoolean abbre) const
{
  NAString result;
  char esps[20];
  NAString suffix;

  if ( abbre == FALSE && getAntiSkewESPs() > 0 ) {
    suffix += "(via ";
    str_itoa(getAntiSkewESPs(), esps);
    suffix += esps;
    suffix += " ESP(s)) ";
  }

  switch ( getIndicator() )
   {
     case skewProperty::UNIFORM_DISTRIBUTE:

        if ( abbre == FALSE ) {
          result += " - uniformly distribute ";
          result += suffix;
          result += "skewed ";
                  
          if ( skewValues_ ) 
             result += skewValues_->getText();
        } else {
           result += "-ud";
        }
        break;

     case skewProperty::BROADCAST:

        if ( abbre == FALSE ) {
           result += " - broadcast "; 
           result += suffix;
           result += "skewed "; 

           if ( skewValues_ ) 
              result += skewValues_->getText();

	   if (getBroadcastOneRow())
	     result += " and one row ";
        } else {
           result += "-br";
        }
        break;

    default:
      break;
  }

  return result;
}

Int64List* 
SkewedDataPartitioningFunction::buildHashListForSkewedValues()
{
  if ( skewHashList_ )
    return skewHashList_;

  const SkewedValueList* svlist = getSkewProperty().getSkewValues();

  if ( svlist == NULL )
    return NULL;

  skewHashList_ = new (STMTHEAP) Int64List(STMTHEAP, svlist->entries());
  CollHeap *heap = CmpCommon::statementHeap();
  char data[10]; Int32 len = 0;

  ConstValue* cvExp = NULL;
  ItemExpr* expForSkewedValue = NULL;
  ValueIdList exprs;
  UInt32 hashval;

  if ( !svlist->needToComputeFinalHash() ) {

     // for TRUE MC-SB. hash value for the composite skew value is pre-computed.
     for (CollIndex i = 0; i < svlist->entries(); i++) {
        hashval = (UInt32)((*svlist)[i].getDblValue());
        skewHashList_ -> insertAt(i, hashval);
     }

  } else {

     const NAType* naType = svlist->getNAType(); 
     NABoolean useHash = naType->useHashRepresentation();
     UInt32 flags = ExHDPHash::NO_FLAGS;

     for (CollIndex i = 0; i < svlist->entries(); i++) 
     {
        if ( (*svlist)[i].getValue().isNull() ) {
           // an untyped NULL constant. All null values hash to this constant.
           // Ref. module exp_function.cpp, method ExHDPHash::eval() and
           // ex_function_hash::eval().
           hashval =  ExHDPHash::nullHashValue; //666654765;
        } else {
   
            if ( useHash ) {
              // The skew value for any character or exact numeric data type is 
              // the hash itself. So here we just copy and cast it back to UInt32.
              hashval = (UInt32)((*svlist)[i].getDblValue());
            } else {
   
             // If we hit this branch, it means the boundary values are stored in
             // the skew list. We can assume these values can be safely casted
             // back (without loosing precision). Otherwise, storing-hash- 
             // value method should be used.
              (*svlist)[i].outputToBufferToComputeRTHash(naType,data,len,flags);
              hashval = computeHashValue(data, flags, len);
            }
        }
        skewHashList_ -> insertAt(i, hashval);
     }

  }

  return skewHashList_;
}

//============================

HivePartitioningFunction::~HivePartitioningFunction() {}

PartitioningRequirement*
HivePartitioningFunction::makePartitioningRequirement()
{
  return new (CmpCommon::statementHeap())
             RequireHive(this);
}

PartitioningFunction*
HivePartitioningFunction::copy() const
{
  return new (CmpCommon::statementHeap())
    HivePartitioningFunction(*this, CmpCommon::statementHeap());
}

// -----------------------------------------------------------------------
// HivePartitioningFunction::createPartitioningKeyPredicates()
// -----------------------------------------------------------------------
void HivePartitioningFunction::createPartitioningKeyPredicates()
{
  createBetweenPartitioningKeyPredicates("_sys_HostVarLoHivePart",
                                         "_sys_HostVarHiHivePart");

} // HivePartitioningFunction::createPartitioningKeyPredicates()


// -----------------------------------------------------------------------
// HivePartitioningFunction::isAGroupingOf()
// -----------------------------------------------------------------------
  // Right now we assume that the split function of a hash partitioning
  // function is such that no two functions are a grouping of each other.
  // The only exception is identity, of course. We use the base class'
  // implementation.
  //
  // With more knowledge about the split function (e.g. by knowing it's
  // a simple modulus), one could guarantee that a 4-way hash-partitioning
  // scheme is actually a grouping of an 8-way scheme. This is not really
  // necessary and therefore not done here.


ItemExpr *
HivePartitioningFunction::buildHashingExpressionForExpr(ItemExpr* expr) const
{
  return new (CmpCommon::statementHeap()) HiveHash(expr);
}

UInt32 HivePartitioningFunction::computeHashValue(char* data, UInt32 flags, Int32 len) 
{
  // need a Hive()
  // Directly call the implementation function to compute the hash. NULL
  // values and VARCHAR data types are not handled.
  return FastHash(data, len);
}

ItemExpr * HivePartitioningFunction::getHashingExpression() const
{
  // need a Hive()
  ItemExpr* hashExpr = NULL;
  ItemExpr* partExpr = getExpression();
  if ( partExpr ) {
    hashExpr = partExpr->child(0);

    CMPASSERT(hashExpr AND
              hashExpr->getOperatorType()== ITM_HASH);
  }
  return hashExpr;
}


// -----------------------------------------------------------------------
// Method for debugging.
// -----------------------------------------------------------------------
const NAString HivePartitioningFunction::getText() const
{
   return getTextImp("hive"); 
}

void HivePartitioningFunction::print(FILE* ofd, const char* indent,
	   			     const char* title) const
{
  PartitioningFunction::print(ofd, indent, "HivePartitioningFunction");
} // HivePartitioningFunction::print()

PartitioningFunction*
HivePartitioningFunction::
createPartitioningFunctionForIndexDesc(IndexDesc *idesc) const
{
  const NAFileSet * fileSet = idesc->getNAFileSet();
  const NAColumnArray & allColumns = fileSet->getAllColumns();
  const NAColumnArray & partKeyColumns = fileSet->getPartitioningKeyColumns();

  CollIndex ixColNumber;
  ValueId keyValueId;
  ValueIdSet partitioningKey;
  ValueIdList partitioningKeyList;

  for (CollIndex i = 0; i < partKeyColumns.entries(); i++)
    {
      // which column of the index is this (usually this will be == i)
      ixColNumber = allColumns.index(partKeyColumns[i]);

      // insert the value id of the index column into the partitioning
      // key column value id set
      keyValueId = idesc->getIndexColumns()[ixColNumber];
      partitioningKey += keyValueId;
      partitioningKeyList.insertAt(i,keyValueId);
    } // end loop over partitioning key columns

  // -----------------------------------------------------------------
  // Allocate a new HashPartitioningFunction.
  // -----------------------------------------------------------------
  HivePartitioningFunction *partFunc = new(idesc->wHeap())
    HivePartitioningFunction (partitioningKey,
                                  partitioningKeyList,
                                  getCountOfPartitions(),
                                  getNodeMap()->copy(idesc->wHeap()));

  // -----------------------------------------------------------------
  // Construct the partitioning key predicates.
  // -----------------------------------------------------------------
  partFunc->createPartitioningKeyPredicates();

  return partFunc;

} // HivePartitioningFunction::createPartitioningFunctionForIndexDesc()

NABoolean HivePartitioningFunction::isAGroupingOf(
     const PartitioningFunction &other,
     Lng32* maxPartsPerGroup) const
{
  if (maxPartsPerGroup != NULL)
    *maxPartsPerGroup = 1;

  if ( comparePartKeyToKey(other) == INCOMPATIBLE )
    return FALSE;

  // assume we can repartition a hive table with <m> partitions to
  // <n> partitions. No question asked.

  if ( getCountOfPartitions() < other.getCountOfPartitions() &&
       other.getCountOfPartitions() % getCountOfPartitions() != 0 )
    return FALSE;

  if ( other.getCountOfPartitions() < getCountOfPartitions() &&
       getCountOfPartitions() % other.getCountOfPartitions() != 0 )
    return FALSE;

  // This is a grouping of.  Set the maxPartsPerGroup and return TRUE.
  if (maxPartsPerGroup != NULL)
    *maxPartsPerGroup = other.getCountOfPartitions() / getCountOfPartitions();

  return TRUE;

}

void
HivePartitioningFunction::normalizePartitioningKeys(NormWA& normWARef)
{
  HashPartitioningFunction::normalizePartitioningKeys(normWARef);
  keyColumnList_.normalizeNode(normWARef);

  // don't normalize original key col list, avoid VEGies which could
  // cause data type changes.
}

